import json import shutil from collections import defaultdict from pathlib import Path import numpy as np from pyproj import Transformer from shapely import STRtree, make_valid, set_precision from shapely.errors import GEOSException from shapely.geometry import MultiPolygon, Polygon, box, mapping, shape from shapely.ops import transform as transform_geometry from tqdm import tqdm from .geometry import safe_difference, safe_union _to_wgs84 = None def _get_to_wgs84(): global _to_wgs84 if _to_wgs84 is None: _to_wgs84 = Transformer.from_crs("EPSG:27700", "EPSG:4326", always_xy=True) return _to_wgs84 def _largest_polygonal(geom) -> Polygon | None: if geom is None or geom.is_empty: return None if not geom.is_valid: geom = make_valid(geom) if geom.geom_type == "Polygon": return geom if geom.geom_type == "MultiPolygon": return max(geom.geoms, key=lambda g: g.area) if geom.geom_type == "GeometryCollection": polygons = [ polygon for part in geom.geoms if (polygon := _largest_polygonal(part)) is not None ] if polygons: return max(polygons, key=lambda g: g.area) return None # Output coordinate grid (~0.11 m at UK latitudes). Polygons whose extent is # below this in any direction snap to empty during serialization. _OUTPUT_PRECISION_DEG = 0.000001 # Minimal BNG buffer used to rescue sub-grid slivers into a representable # footprint. A near-zero-area Voronoi/INSPIRE spike (e.g. three almost-collinear # vertices) would otherwise vanish at output precision; since every *active* # postcode must keep a boundary (validate_outputs enforces this with zero # tolerance), we fatten it just enough to survive snapping rather than drop it. _MIN_FOOTPRINT_BUFFER_M = 0.5 def _snap_to_wgs84_geojson(geom_bng: Polygon | MultiPolygon) -> dict | None: """Transform a BNG polygon to WGS84, snap to output precision, validate. Validates the *serialized* GeoJSON dict (via a ``shape()`` round-trip), not just the intermediate Shapely object: coordinate snapping during serialization can otherwise leave a self-intersecting ring that only shows up once the feature is read back from disk. Returns ``None`` if the geometry collapses to empty (a sub-grid sliver). """ transformer = _get_to_wgs84() wgs84 = transform_geometry(transformer.transform, geom_bng) try: wgs84 = set_precision(wgs84, _OUTPUT_PRECISION_DEG, mode="valid_output") except GEOSException: # Precision snapping can fail on pathological geometries; fall back to a # plain validity repair without coordinate snapping. wgs84 = make_valid(wgs84) wgs84 = _largest_polygonal(wgs84) if wgs84 is None: return None geojson_dict = mapping(wgs84) # The geometry that actually reaches disk is the GeoJSON dict, so validate # *that* (not the pre-serialization object) and repair if needed. round_trip = shape(geojson_dict) if round_trip.is_empty or not round_trip.is_valid: round_trip = _largest_polygonal(make_valid(round_trip)) if round_trip is None or round_trip.is_empty: return None geojson_dict = mapping(round_trip) return geojson_dict def _rescue_footprint(geom_bng) -> dict | None: """Fatten a degenerate BNG geometry into a representable footprint and snap.""" footprint = _largest_polygonal(geom_bng.buffer(_MIN_FOOTPRINT_BUFFER_M)) if footprint is None: return None return _snap_to_wgs84_geojson(footprint) def to_wgs84_geojson( geom: Polygon | MultiPolygon, tolerance: float = 1.0 ) -> dict | None: """Simplify geometry in BNG, convert to WGS84, return a valid GeoJSON dict. A few thousand postcodes reduce to a sub-grid sliver that snaps to empty at output precision. Dropping them would leave an active postcode with no boundary (validate_outputs rejects that with zero tolerance), so instead they are fattened into a minimal footprint at the right location: first by buffering the (often elongated) sliver itself, then -- for fully-degenerate input -- a small disc around ``representative_point()``, which lies inside any non-empty geometry. ``None`` is returned only for a genuinely empty input. """ if geom is None or geom.is_empty: return None cleaned = _largest_polygonal(geom) if cleaned is not None: simplified = _largest_polygonal( cleaned.simplify(tolerance, preserve_topology=True) ) if simplified is None: simplified = cleaned # Normal path; if snapping erases a thin sliver, fatten its real shape. result = _snap_to_wgs84_geojson(simplified) if result is None: result = _rescue_footprint(simplified) if result is not None: return result # Universal fallback for input too degenerate to clean or fatten in place. return _rescue_footprint(geom.representative_point()) def to_wgs84_geojson_multi( geom: Polygon | MultiPolygon, tolerance: float = 1.0 ) -> dict | None: """Convert a (possibly multi-part) postcode geometry to a GeoJSON dict, preserving every part. Each part is simplified/snapped/rescued independently via :func:`to_wgs84_geojson`; the result is a ``Polygon`` for a single part or a ``MultiPolygon`` for several. ``None`` only if every part is degenerate. """ parts = list(geom.geoms) if geom.geom_type == "MultiPolygon" else [geom] part_dicts = [d for part in parts if (d := to_wgs84_geojson(part, tolerance))] if not part_dicts: return None if len(part_dicts) == 1: return part_dicts[0] return { "type": "MultiPolygon", "coordinates": [pd["coordinates"] for pd in part_dicts], } # Interior holes from the INSPIRE+Voronoi+make_valid chain are small artifacts and # get filled. A hole at least this large is likely a genuinely enclosed postcode # (kept, so we never solidify over a neighbour); the de-overlap pass is the real # guarantee, this is defence-in-depth. _MAX_ARTIFACT_HOLE_AREA = 1000.0 def _fill_small_holes(poly: Polygon) -> Polygon: kept = [r for r in poly.interiors if Polygon(r).area >= _MAX_ARTIFACT_HOLE_AREA] return Polygon(poly.exterior, kept) def _fill_holes(geom): """Fill small artifact interior rings; keep large (real-enclosed) holes.""" if geom.geom_type == "Polygon": return _fill_small_holes(geom) elif geom.geom_type == "MultiPolygon": return MultiPolygon([_fill_small_holes(p) for p in geom.geoms]) return geom # A postcode genuinely split across an OA seam (by a railway, river, or main road # wider than the merge buffer) arrives here as a MultiPolygon. Keeping only the # largest part used to discard the rest, leaving ~1.8% of merged area as uncovered # gaps (often 3000-5000 m² building blocks). Keep every part at least this big; # smaller detached bits are Voronoi/clipping noise and are still dropped. _MIN_DETACHED_PART_AREA = 100.0 def _keep_polygon_parts(geom): """Keep all MultiPolygon parts >= _MIN_DETACHED_PART_AREA (largest if none).""" if geom.geom_type != "MultiPolygon": return geom parts = [g for g in geom.geoms if g.area >= _MIN_DETACHED_PART_AREA] if not parts: parts = [max(geom.geoms, key=lambda g: g.area)] return parts[0] if len(parts) == 1 else MultiPolygon(parts) def merge_fragments( all_fragments: list[tuple[str, Polygon | MultiPolygon]], greenspace_tree=None, greenspace_geoms=None, ) -> dict[str, Polygon | MultiPolygon]: """Merge cross-OA fragments for postcodes spanning multiple OAs. Args: all_fragments: List of (postcode, geometry) pairs. greenspace_tree: Optional STRtree of park/water polygons. greenspace_geoms: Optional list of park/water geometries (indexed by tree). """ by_postcode: dict[str, list] = defaultdict(list) for pc, geom in all_fragments: by_postcode[pc].append(geom) merged = {} for pc, parts in by_postcode.items(): combined = safe_union(parts) if combined.is_empty: continue if not combined.is_valid: combined = make_valid(combined) # Close tiny gaps between adjacent OA boundary edges (float mismatches). # The closing can erode a tiny MultiPolygon (e.g. a postcode with only a # sliver fragment) to nothing, which would leave the postcode with no # geometry at all — keep the un-closed shape if that happens. if combined.geom_type == "MultiPolygon": closed = combined.buffer(5.0).buffer(-5.0) if not closed.is_valid: closed = make_valid(closed) if not closed.is_empty: combined = closed # Keep the postcode whole: the largest part plus any other substantial # part (a genuine railway/river split), dropping only tiny noise slivers. combined = _keep_polygon_parts(combined) # Remove artifact interior holes from INSPIRE+Voronoi+make_valid chain combined = _fill_holes(combined) # Subtract parks/water if provided if greenspace_tree is not None and greenspace_geoms is not None: from .greenspace import subtract_greenspace pre_green = combined combined = subtract_greenspace(combined, greenspace_tree, greenspace_geoms) combined = _keep_polygon_parts(combined) # Do NOT _fill_holes here: interior holes carved by the greenspace # subtraction (lakes, enclosed parks) are intentional, not artifacts. # Filling them would re-add the removed area and negate the # subtraction. Artifact holes from the INSPIRE+Voronoi+make_valid # chain were already removed by the _fill_holes above (pre-subtraction). # Revert if subtraction + fragment selection lost >90% of area if pre_green.area > 0 and combined.area / pre_green.area < 0.1: combined = pre_green merged[pc] = combined return merged def _polygonal(geom): """Return only the polygonal part(s) of a geometry, or None if none remain.""" if geom is None or geom.is_empty: return None if geom.geom_type in ("Polygon", "MultiPolygon"): return geom if geom.geom_type == "GeometryCollection": polys = [ g for g in geom.geoms if g.geom_type in ("Polygon", "MultiPolygon") and not g.is_empty ] if not polys: return None # Both callers run on WGS84-degree output geometry, so the robustness # fallback snaps on the 1e-6° grid (~0.11 m), not geometry.py's metre # default — a coarse metre grid would obliterate a degree-scale shape. merged = safe_union(polys, grid=_OUTPUT_PRECISION_DEG) return merged if not merged.is_empty else None return None def _resolve_overlaps( items: list[tuple[str, Polygon | MultiPolygon]], ) -> list[tuple[str, Polygon | MultiPolygon]]: """Make the postcode polygons a partition: no two cover the same ground. Overlap appears at OA seams (the 5m merge buffer expands each postcode independently), from simplifying each postcode on its own, and as genuine containment (a postcode fully enclosed by another). Each postcode is trimmed by the union of its higher-priority overlapping neighbours, where **priority = ascending area**: a smaller postcode wins contested ground. That single rule handles both cases correctly — an enclosed postcode is always smaller than its container, so it keeps its area while the container gets a hole (a `overlaps` query alone would miss containment entirely). Run last, on the final output geometries, so nothing re-introduces overlap afterwards. A postcode that would be emptied keeps its original geometry, so an active postcode is never dropped. """ geoms = [g for _, g in items] n = len(geoms) if n < 2: return items # rank[i]: 0 = highest priority (smallest area). Postcode string breaks ties # for determinism. rank = { idx: r for r, idx in enumerate( sorted(range(n), key=lambda i: (geoms[i].area, items[i][0])) ) } tree = STRtree(geoms) arr = np.array(geoms, dtype=object) pairs: set[tuple[int, int]] = set() # "overlaps" gives partial overlaps; "contains" gives containment (which # "overlaps" excludes) — together they cover every 2-D overlap without the # edge-touch explosion a plain "intersects" query would add. for predicate in ("overlaps", "contains"): qsrc, qtgt = tree.query(arr, predicate=predicate) for s, t in zip(qsrc.tolist(), qtgt.tolist()): if s != t: pairs.add((s, t) if s < t else (t, s)) # For each loser (lower priority) the higher-priority neighbours to subtract. higher: dict[int, list[int]] = defaultdict(list) for a, b in pairs: winner, loser = (a, b) if rank[a] < rank[b] else (b, a) higher[loser].append(winner) out = list(geoms) # Process losers from highest priority down, so every subtracted neighbour is # already finalised. for i in sorted(higher, key=lambda idx: rank[idx]): # These geometries are WGS84 degrees already snapped to output precision, # so the robustness fallback snaps on the same 1e-6° grid (~0.11 m) rather # than geometry.py's metre-CRS default. A raw difference can still raise a # "side location conflict" on near-coincident OA-seam edges; the # fixed-precision overlay noded them away. cut = safe_union([out[j] for j in higher[i]], grid=_OUTPUT_PRECISION_DEG) trimmed = safe_difference(out[i], cut, grid=_OUTPUT_PRECISION_DEG) if not trimmed.is_valid: trimmed = make_valid(trimmed) # Keep all polygonal parts: these geometries are in WGS84 degrees, so an # area threshold here would wrongly drop everything but the largest part # and re-open the very gaps the seam fix closed. trimmed = _polygonal(trimmed) if trimmed is not None and not trimmed.is_empty: out[i] = trimmed return [(pc, out[i]) for i, (pc, _) in enumerate(items)] def _round_coords(coords, ndigits=6): if coords and isinstance(coords[0], (int, float)): return [round(coords[0], ndigits), round(coords[1], ndigits)] return [_round_coords(c, ndigits) for c in coords] def _snap_polygonal(geom, grid): """Re-node ``geom`` onto ``grid`` (``valid_output``) → polygonal-only, or None. ``set_precision`` ``valid_output`` runs an OverlayNG noding pass that places every vertex on a multiple of ``grid`` *and* fixes the topology, so a plain coordinate round of the result is exact (no two distinct vertices can land in the same cell). Falls back to ``make_valid`` if precision-reduction raises. """ try: snapped = set_precision(geom, grid, mode="valid_output") except GEOSException: snapped = make_valid(geom) return _polygonal(snapped if snapped.is_valid else make_valid(snapped)) # A square this many output-grid cells on a side, used as the last-resort # footprint when snapping erases a sub-grid sliver. ~10 cells (≈0.7–1.1 m at UK # latitudes) is invisible at map scale yet survives the 1e-6° snap as a valid, # 4-corner ring. _FOOTPRINT_GRID_CELLS = 5 def _grid_footprint(geom): """A tiny grid-aligned square at ``geom``'s representative point, snapped valid. Last line of defence so an *active* postcode never vanishes: the de-overlap pass can shave a small (e.g. co-located, non-geographic) postcode down to a sub-grid sliver that disappears when snapped to output precision. Rather than drop it, place a minimal valid footprint at its location. The tiny overlap this re-creates with the neighbour that trimmed it is harmless — the output partition is best-effort, a missing boundary is a hard validation failure. """ try: point = geom.representative_point() except GEOSException: return None half = _OUTPUT_PRECISION_DEG * _FOOTPRINT_GRID_CELLS square = box(point.x - half, point.y - half, point.x + half, point.y + half) return _snap_polygonal(square, _OUTPUT_PRECISION_DEG) def _geojson_geometry(geom) -> dict | None: """Serialize a WGS84 polygon/multipolygon to a *valid* 6dp GeoJSON dict, or None. The coordinates are snapped onto the 1e-6° output grid with a re-noding pass BEFORE the 6dp rounding, not by the round alone. ``_resolve_overlaps`` leaves thin overlap-sliver triangles with full-precision (off-grid) vertices at OA seams; a bare round-to-6dp collapses those into degenerate rings (GEOS "Too few points") and pinches rings into self-intersections that pass the pre-rounding validity check but fail once the feature is read back from disk. Snapping with ``valid_output`` nodes the geometry onto the grid so the round that follows lands on exact 1e-6 multiples and cannot reintroduce invalidity. ``None`` only for a genuinely empty/degenerate-with-no-location input; a non-empty geometry that snaps to a sub-grid sliver is rescued into a minimal grid footprint rather than dropped (an active postcode must keep a boundary). """ geom = _polygonal(geom if geom.is_valid else make_valid(geom)) if geom is None or geom.is_empty: return None snapped = _snap_polygonal(geom, _OUTPUT_PRECISION_DEG) if snapped is None or snapped.is_empty: snapped = _grid_footprint(geom) if snapped is None or snapped.is_empty: return None gj = mapping(snapped) out = {"type": gj["type"], "coordinates": _round_coords(gj["coordinates"])} # Defence-in-depth: re-validate the dict that actually reaches disk. Snapping # makes the round exact, so this should already hold; repair once more on the # grid if a pathological vertex still pinches a ring. if not shape(out).is_valid: snapped = _snap_polygonal(shape(out), _OUTPUT_PRECISION_DEG) if snapped is None or snapped.is_empty: return None gj = mapping(snapped) out = {"type": gj["type"], "coordinates": _round_coords(gj["coordinates"])} return out def write_district_geojson( postcodes: dict[str, Polygon | MultiPolygon], output_dir: Path ) -> int: """Group postcodes by district, write GeoJSON files. Returns file count. Before writing, the postcode polygons are converted to their final WGS84 form and made a partition (overlaps removed) so the output never has two postcodes covering the same ground. """ units_dir = output_dir / "units" tmp_units_dir = output_dir / "units.tmp" output_dir.mkdir(parents=True, exist_ok=True) if tmp_units_dir.exists(): shutil.rmtree(tmp_units_dir) tmp_units_dir.mkdir(parents=True) skipped: list[str] = [] # Pass 1: convert every postcode to its final WGS84 geometry (simplify, snap, # sliver-rescue, multi-part preserved). Sorted → deterministic de-overlap # priority. to_wgs84_geojson_multi returns None only for a genuinely empty # input, which is skipped and reported rather than aborting a multi-hour run. converted: list[tuple[str, Polygon | MultiPolygon]] = [] for pc in sorted(postcodes): gj = to_wgs84_geojson_multi(postcodes[pc]) if gj is None: skipped.append(pc) continue converted.append((pc, shape(gj))) # Remove overlap strips so the output is a clean partition. converted = _resolve_overlaps(converted) by_district: dict[str, list[tuple[str, Polygon | MultiPolygon]]] = defaultdict(list) for pc, geom in converted: parts = pc.split() district = parts[0] if parts else pc[:4] by_district[district].append((pc, geom)) file_count = 0 for district, entries in tqdm( sorted(by_district.items()), desc="Writing GeoJSON", unit="file" ): features = [] for pc, geom in sorted(entries, key=lambda x: x[0]): geojson_geom = _geojson_geometry(geom) if geojson_geom is None: skipped.append(pc) continue features.append( { "type": "Feature", "geometry": geojson_geom, "properties": { "postcodes": pc, "mapit_code": pc.replace(" ", ""), }, } ) if not features: continue collection = {"type": "FeatureCollection", "features": features} out_path = tmp_units_dir / f"{district}.geojson" with open(out_path, "w") as f: json.dump(collection, f, separators=(",", ":")) file_count += 1 if skipped: preview = ", ".join(skipped[:10]) suffix = " …" if len(skipped) > 10 else "" print( f" Skipped {len(skipped)} postcode(s) with degenerate (sub-grid) " f"geometry: {preview}{suffix}" ) if units_dir.exists(): shutil.rmtree(units_dir) tmp_units_dir.replace(units_dir) return file_count