"""Tests for the postcode_boundaries module. Each test targets a specific bug or edge case identified during code review. """ import json import numpy as np import polars as pl import pytest from shapely.geometry import MultiPolygon, Polygon, box from shapely.ops import unary_union from .fragments_cache import ( fragments_cache_is_fresh, load_fragments, save_fragments, ) from .__main__ import _oa_fragments, _process_oas from .inspire import build_inspire_index from .oa_boundaries import parse_gpkg_geometry from .greenspace import subtract_greenspace from .output import ( _fill_holes, merge_fragments, to_wgs84_geojson, to_wgs84_geojson_multi, write_district_geojson, ) from .process_oa import _extract_polygonal, process_oa from .uprn import get_oa_uprns, load_uprns from .voronoi import _equal_split_fallback, compute_voronoi_regions # --------------------------------------------------------------------------- # Fixtures # --------------------------------------------------------------------------- @pytest.fixture def square_boundary(): """A 100x100m square OA boundary in BNG coords.""" return box(500000, 180000, 500100, 180100) @pytest.fixture def uprn_parquet(tmp_path): """Write a minimal UPRN parquet file with 3 OAs, return its path.""" df = pl.DataFrame( { "GRIDGB1E": [500010, 500020, 500030, 500040, 500050, 500060], "GRIDGB1N": [180010, 180020, 180030, 180040, 180050, 180060], "PCDS": ["AA1 1AA", "AA1 1AB", "BB2 2BB", "BB2 2BC", "CC3 3CC", "CC3 3CD"], "OA21CD": [ "E00000001", "E00000001", "E00000002", "E00000002", "E00000003", "E00000003", ], } ) path = tmp_path / "uprn.parquet" df.write_parquet(path) return path # --------------------------------------------------------------------------- # Bug 1: First OA silently dropped # --------------------------------------------------------------------------- class TestFirstOADropped: """The UPRN offset computation drops the first OA (alphabetically).""" def test_first_oa_present_in_offsets(self, uprn_parquet): """E00000001 is the first OA after sorting. It must appear in offsets.""" df, offsets = load_uprns(uprn_parquet) assert "E00000001" in offsets, ( "First OA (E00000001) missing from offsets — shift(1) null comparison bug" ) def test_all_oas_present(self, uprn_parquet): """Every OA in the data must have an offset entry.""" df, offsets = load_uprns(uprn_parquet) assert set(offsets.keys()) == {"E00000001", "E00000002", "E00000003"} def test_first_oa_data_accessible(self, uprn_parquet): """UPRNs for the first OA must be retrievable via get_oa_uprns.""" df, offsets = load_uprns(uprn_parquet) points, postcodes = get_oa_uprns(df, offsets, "E00000001") assert len(postcodes) == 2 assert set(postcodes) == {"AA1 1AA", "AA1 1AB"} # --------------------------------------------------------------------------- # Bug 2: Whitespace-only postcodes slip through # --------------------------------------------------------------------------- class TestWhitespacePostcodes: """Postcodes that are only whitespace must be filtered out.""" def test_whitespace_postcodes_excluded(self, tmp_path): """A PCDS value of ' ' should not survive loading.""" df = pl.DataFrame( { "GRIDGB1E": [500010, 500020, 500030], "GRIDGB1N": [180010, 180020, 180030], "PCDS": ["AA1 1AA", " ", "BB2 2BB"], "OA21CD": ["E00000001", "E00000001", "E00000002"], } ) path = tmp_path / "uprn.parquet" df.write_parquet(path) loaded_df, offsets = load_uprns(path) all_postcodes = loaded_df["PCDS"].to_list() assert "" not in all_postcodes, "Empty string postcode survived strip+filter" assert " " not in all_postcodes, "Whitespace postcode survived filter" def test_whitespace_only_oa_excluded(self, tmp_path): """An OA where all UPRNs have whitespace-only postcodes should not appear.""" df = pl.DataFrame( { "GRIDGB1E": [500010, 500020], "GRIDGB1N": [180010, 180020], "PCDS": [" ", " "], "OA21CD": ["E00000099", "E00000099"], } ) path = tmp_path / "uprn.parquet" df.write_parquet(path) loaded_df, _ = load_uprns(path) assert len(loaded_df) == 0 def test_non_english_oas_excluded(self, tmp_path): df = pl.DataFrame( { "GRIDGB1E": [500010, 300010], "GRIDGB1N": [180010, 220010], "PCDS": ["AA1 1AA", "CF1 1AA"], "OA21CD": ["E00000001", "W00000001"], } ) path = tmp_path / "uprn.parquet" df.write_parquet(path) loaded_df, offsets = load_uprns(path) assert set(offsets) == {"E00000001"} assert loaded_df["PCDS"].to_list() == ["AA1 1AA"] def test_terminated_postcodes_are_remapped(self, tmp_path): uprns = pl.DataFrame( { "GRIDGB1E": [500010], "GRIDGB1N": [180010], "PCDS": ["aa1 1aa"], "OA21CD": ["E00000001"], } ) uprn_path = tmp_path / "uprn.parquet" uprns.write_parquet(uprn_path) arcgis = pl.DataFrame( { "pcds": ["AA1 1AA", "AA1 1AB"], "east1m": [500010, 500030], "north1m": [180010, 180020], "oa21cd": ["E00000001", "E00000001"], "doterm": ["2020-01-01", None], "ctry25cd": ["E92000001", "E92000001"], } ) arcgis_path = tmp_path / "arcgis.parquet" arcgis.write_parquet(arcgis_path) loaded_df, _offsets = load_uprns(uprn_path, arcgis_path) assert loaded_df["PCDS"].to_list() == ["AA1 1AB"] def test_remapped_terminated_postcode_adopts_successor_oa(self, tmp_path): """When a terminated postcode is remapped to its active successor, the remapped seed point must carry the SUCCESSOR's OA (and coords), not the terminated postcode's original OA. Pre-fix the row kept OA21CD of the terminated postcode, seeding the successor into an OA it doesn't belong to and splitting its boundary across OAs.""" # Terminated AA1 1AA sits in OA E00000001. Its nearest active successor # AA1 1AB lives in a DIFFERENT OA (E00000002) far away. uprns = pl.DataFrame( { "GRIDGB1E": [500010], "GRIDGB1N": [180010], "PCDS": ["AA1 1AA"], "OA21CD": ["E00000001"], } ) uprn_path = tmp_path / "uprn.parquet" uprns.write_parquet(uprn_path) arcgis = pl.DataFrame( { "pcds": ["AA1 1AA", "AA1 1AB"], "east1m": [500010, 500030], "north1m": [180010, 180020], # AA1 1AA terminated → only AA1 1AB is an active successor, and # it belongs to a different OA than the terminated postcode. "oa21cd": ["E00000001", "E00000002"], "doterm": ["2020-01-01", None], "ctry25cd": ["E92000001", "E92000001"], } ) arcgis_path = tmp_path / "arcgis.parquet" arcgis.write_parquet(arcgis_path) loaded_df, offsets = load_uprns(uprn_path, arcgis_path) # The remapped point must be grouped under the successor's OA, not the # terminated postcode's OA. assert "E00000002" in offsets, "Successor OA missing — remap kept old OA" assert "E00000001" not in offsets, ( "Remapped point still lives in the terminated postcode's OA" ) points, postcodes = get_oa_uprns(loaded_df, offsets, "E00000002") assert postcodes == ["AA1 1AB"] # It should also adopt the successor's authoritative coordinates. assert points.tolist() == [[500030.0, 180020.0]] def test_arcgis_filters_to_active_english_postcodes(self, tmp_path): uprns = pl.DataFrame( { "GRIDGB1E": [500010, 500020], "GRIDGB1N": [180010, 180020], "PCDS": ["AA1 1AA", "CF1 1AA"], "OA21CD": ["E00000001", "E00000001"], } ) uprn_path = tmp_path / "uprn.parquet" uprns.write_parquet(uprn_path) arcgis = pl.DataFrame( { "pcds": ["AA1 1AA", "CF1 1AA"], "east1m": [500010, 300010], "north1m": [180010, 220010], "oa21cd": ["E00000001", "W00000001"], "doterm": [None, None], "ctry25cd": ["E92000001", "W92000004"], } ) arcgis_path = tmp_path / "arcgis.parquet" arcgis.write_parquet(arcgis_path) loaded_df, _offsets = load_uprns(uprn_path, arcgis_path) assert loaded_df["PCDS"].to_list() == ["AA1 1AA"] def test_arcgis_adds_centroid_seed_for_active_postcode_without_uprn(self, tmp_path): uprns = pl.DataFrame( { "GRIDGB1E": [500010], "GRIDGB1N": [180010], "PCDS": ["AA1 1AA"], "OA21CD": ["E00000001"], } ) uprn_path = tmp_path / "uprn.parquet" uprns.write_parquet(uprn_path) arcgis = pl.DataFrame( { "pcds": ["AA1 1AA", "BB1 1BB"], "east1m": [500010, 510000], "north1m": [180010, 190000], "oa21cd": ["E00000001", "E00000002"], "doterm": [None, None], "ctry25cd": ["E92000001", "E92000001"], } ) arcgis_path = tmp_path / "arcgis.parquet" arcgis.write_parquet(arcgis_path) loaded_df, offsets = load_uprns(uprn_path, arcgis_path) assert set(loaded_df["PCDS"].to_list()) == {"AA1 1AA", "BB1 1BB"} points, postcodes = get_oa_uprns(loaded_df, offsets, "E00000002") assert postcodes == ["BB1 1BB"] assert points.tolist() == [[510000.0, 190000.0]] # --------------------------------------------------------------------------- # Bug 3: Voronoi deduplication is first-seen-wins # --------------------------------------------------------------------------- class TestVoronoiDeduplication: """Multiple postcodes sharing a coordinate must all receive area.""" def test_shared_coords_both_postcodes_get_area(self, square_boundary): """Two postcodes with UPRNs at the same coords: both must get area.""" # Two postcodes each have one UPRN at (500050, 180050) # Plus postcode A has one at a different location points = np.array( [ [500020, 180050], # postcode A — unique location [500050, 180050], # postcode A — shared location [500050, 180050], # postcode B — shared location (same coords) [500080, 180050], # postcode B — unique location ] ) postcodes = ["A", "A", "B", "B"] result = compute_voronoi_regions(points, postcodes, square_boundary) assert "A" in result, "Postcode A should have Voronoi area" assert "B" in result, "Postcode B should have Voronoi area" def test_all_shared_coords_no_postcode_lost(self, square_boundary): """When all UPRNs for a postcode share coords with another, it must still get area.""" # Postcode B's only UPRN is at the same coords as postcode A's points = np.array( [ [500050, 180050], # postcode A [500050, 180050], # postcode B — identical coords ] ) postcodes = ["A", "B"] result = compute_voronoi_regions(points, postcodes, square_boundary) assert "A" in result, "Postcode A should have area" assert "B" in result, "Postcode B should have area" # Both should get roughly equal area since they're at the same location area_a = result["A"].area area_b = result["B"].area total = area_a + area_b assert area_a / total > 0.2, "Postcode A should have meaningful area" assert area_b / total > 0.2, "Postcode B should have meaningful area" def test_int64_coords_jitter_works(self, square_boundary): """Int64 coords (production dtype) must still jitter correctly.""" points = np.array([[500050, 180050], [500050, 180050]], dtype=np.int64) postcodes = ["A", "B"] result = compute_voronoi_regions(points, postcodes, square_boundary) assert "A" in result, "Postcode A missing with int64 coords" assert "B" in result, "Postcode B missing with int64 coords" # --------------------------------------------------------------------------- # Bug 4: Voronoi collinear fallback gives everything to first postcode # --------------------------------------------------------------------------- class TestVoronoiCollinear: """Collinear points (handled by dummy corners) must distribute area fairly.""" def test_collinear_points_all_postcodes_get_area(self, square_boundary): """Points along a line — every postcode must get area.""" points = np.array( [ [500020, 180050], [500040, 180050], [500060, 180050], [500080, 180050], ] ) postcodes = ["A", "A", "B", "B"] result = compute_voronoi_regions(points, postcodes, square_boundary) assert "A" in result, "Postcode A should have area" assert "B" in result, "Postcode B should have area" def test_collinear_points_area_roughly_fair(self, square_boundary): """With equal numbers of collinear points, area split should be roughly fair.""" points = np.array( [ [500030, 180050], [500070, 180050], ] ) postcodes = ["A", "B"] result = compute_voronoi_regions(points, postcodes, square_boundary) assert "A" in result and "B" in result area_a = result["A"].area area_b = result["B"].area ratio = min(area_a, area_b) / max(area_a, area_b) assert ratio > 0.3, f"Area split too unfair: {area_a:.0f} vs {area_b:.0f}" class TestVoronoiCoverage: """Voronoi fallback should cover large OAs even when UPRNs are clustered.""" def test_clustered_points_cover_large_boundary(self): boundary = box(0, 0, 5000, 100) points = np.array([[10, 50], [20, 50]]) result = compute_voronoi_regions(points, ["A", "B"], boundary) covered = unary_union(list(result.values())) assert covered.area == pytest.approx(boundary.area) assert boundary.difference(covered).area < 0.01 class TestEqualSplitFallback: """_equal_split_fallback must give every postcode some area.""" def test_all_postcodes_get_area(self, square_boundary): result = _equal_split_fallback(["A", "B", "C"], square_boundary) assert set(result.keys()) == {"A", "B", "C"} for pc, geom in result.items(): assert geom.area > 0, f"Postcode {pc} got zero area" def test_total_area_approximately_matches(self, square_boundary): result = _equal_split_fallback(["A", "B"], square_boundary) total = sum(g.area for g in result.values()) assert total == pytest.approx(square_boundary.area, rel=0.01) # --------------------------------------------------------------------------- # Bug 5: process_oa can produce non-polygon geometries from make_valid # --------------------------------------------------------------------------- class TestProcessOAGeometryTypes: """process_oa must return only Polygon/MultiPolygon fragments.""" def test_overlapping_inspire_no_postcode_overlap(self): """Overlapping INSPIRE parcels assigned to different postcodes must not overlap.""" oa_geom = box(500000, 180000, 500100, 180100) # Two overlapping parcels — left half and a wider middle section parcel_left = box(500000, 180000, 500060, 180100) parcel_right = box(500040, 180000, 500100, 180100) # overlaps left by 20m # UPRN in left parcel → postcode A, UPRN in right parcel → postcode B points = np.array( [ [500020, 180050], # postcode A — inside left parcel [500080, 180050], # postcode B — inside right parcel ] ) postcodes = ["A", "B"] fragments = process_oa( oa_geom, points, postcodes, inspire_candidates=[parcel_left, parcel_right] ) frag_dict = dict(fragments) assert "A" in frag_dict and "B" in frag_dict # The critical check: no overlap between the two fragments overlap = frag_dict["A"].intersection(frag_dict["B"]) assert overlap.area < 0.01, ( f"Postcodes A and B overlap by {overlap.area:.1f} sqm" ) def test_fragments_are_polygonal(self): """All fragments from process_oa must be Polygon or MultiPolygon.""" oa_geom = box(500000, 180000, 500100, 180100) points = np.array( [ [500020, 180020], [500080, 180080], ] ) postcodes = ["A", "B"] fragments = process_oa(oa_geom, points, postcodes, inspire_candidates=[]) for pc, geom in fragments: assert geom.geom_type in ("Polygon", "MultiPolygon"), ( f"Fragment for {pc} has unexpected type: {geom.geom_type}" ) assert not geom.is_empty, f"Fragment for {pc} is empty" def test_no_geometry_collection_in_output(self): """Even with tricky INSPIRE parcels, output should never be GeometryCollection.""" oa_geom = box(500000, 180000, 500100, 180100) # Create a thin sliver that make_valid might convert to a line sliver = Polygon( [ (500000, 180000), (500100, 180000), (500100, 180000.001), (500000, 180000), ] ) points = np.array( [ [500020, 180020], [500080, 180080], ] ) postcodes = ["A", "B"] fragments = process_oa(oa_geom, points, postcodes, inspire_candidates=[sliver]) for pc, geom in fragments: assert geom.geom_type in ("Polygon", "MultiPolygon"), ( f"Fragment for {pc} has type {geom.geom_type}" ) class TestProcessOAInspireParcelAssignment: """INSPIRE parcels without UPRNs should still shape postcode boundaries.""" def test_unoccupied_inspire_parcel_goes_to_nearest_postcode(self): """A parcel with no contained UPRN should not be split by Voronoi.""" oa_geom = box(0, 0, 100, 100) parcel = box(20, 40, 65, 60) # crosses the x=50 Voronoi split points = np.array( [ [10, 50], # postcode A [90, 50], # postcode B ] ) postcodes = ["A", "B"] fragments = process_oa(oa_geom, points, postcodes, inspire_candidates=[parcel]) frag_dict = dict(fragments) assert "A" in frag_dict and "B" in frag_dict assert parcel.difference(frag_dict["A"]).area < 0.01 assert frag_dict["B"].intersection(parcel).area < 0.01 def test_contained_uprn_claim_wins_over_overlapping_nearest_parcel(self): """Contained-UPRN parcel claims should keep priority over nearest claims.""" oa_geom = box(0, 0, 100, 100) contained_a = box(0, 0, 60, 100) unoccupied_nearer_b = box(50, 0, 80, 100) points = np.array( [ [20, 50], # postcode A, inside contained_a [90, 50], # postcode B, outside unoccupied_nearer_b ] ) postcodes = ["A", "B"] fragments = process_oa( oa_geom, points, postcodes, inspire_candidates=[contained_a, unoccupied_nearer_b], ) frag_dict = dict(fragments) assert "A" in frag_dict and "B" in frag_dict assert contained_a.difference(frag_dict["A"]).area < 0.01 assert frag_dict["A"].intersection(frag_dict["B"]).area < 0.01 assert frag_dict["B"].intersection(box(60, 0, 80, 100)).area > 0 def test_nearest_uses_assignable_fragment_after_contained_subtraction(self): """Nearest assignment should use the part left after priority subtraction.""" oa_geom = box(0, 0, 100, 100) contained_a = box(0, 0, 60, 100) unoccupied = box(25, 0, 80, 100) points = np.array( [ [20, 50], # postcode A, inside contained_a [90, 50], # postcode B, nearest to unoccupied remainder ] ) postcodes = ["A", "B"] fragments = process_oa( oa_geom, points, postcodes, inspire_candidates=[contained_a, unoccupied], ) frag_dict = dict(fragments) assert contained_a.difference(frag_dict["A"]).area < 0.01 assert box(60, 0, 80, 100).difference(frag_dict["B"]).area < 0.01 def test_boundary_uprn_does_not_claim_adjacent_parcel(self): """A UPRN on a parcel edge should not count inside both parcels.""" oa_geom = box(0, 0, 100, 100) left = box(0, 0, 50, 100) right = box(50, 0, 100, 100) points = np.array( [ [50, 50], # postcode A, exactly on shared parcel boundary [75, 50], # postcode B, strictly inside right parcel ] ) postcodes = ["A", "B"] fragments = process_oa( oa_geom, points, postcodes, inspire_candidates=[left, right] ) frag_dict = dict(fragments) assert "A" in frag_dict and "B" in frag_dict assert right.difference(frag_dict["B"]).area < 0.01 def test_disconnected_nearest_fragments_can_go_to_different_postcodes(self): """A split unoccupied parcel should be assigned component by component.""" oa_geom = box(0, 0, 100, 100) contained_b = box(40, 0, 60, 100) unoccupied = box(0, 40, 100, 60) points = np.array( [ [10, 20], # postcode A, nearest to left split fragment [50, 20], # postcode B, inside contained_b but outside unoccupied [90, 20], # postcode C, nearest to right split fragment ] ) postcodes = ["A", "B", "C"] fragments = process_oa( oa_geom, points, postcodes, inspire_candidates=[contained_b, unoccupied], ) frag_dict = dict(fragments) assert box(0, 40, 40, 60).difference(frag_dict["A"]).area < 0.01 assert box(60, 40, 100, 60).difference(frag_dict["C"]).area < 0.01 def test_overlapping_nearest_parcels_do_not_overlap_in_output(self): """Two unoccupied nearest-assigned parcels should be resolved cleanly.""" oa_geom = box(0, 0, 100, 100) left = box(0, 0, 70, 100) right = box(30, 0, 100, 100) points = np.array( [ [10, 50], # postcode A, nearest to left parcel [90, 50], # postcode B, nearest to right parcel ] ) postcodes = ["A", "B"] fragments = process_oa( oa_geom, points, postcodes, inspire_candidates=[left, right] ) frag_dict = dict(fragments) assert "A" in frag_dict and "B" in frag_dict assert frag_dict["A"].intersection(frag_dict["B"]).area < 0.01 def test_mixed_inspire_and_voronoi_covers_oa_without_overlap(self): """Parcel claims plus Voronoi fallback should cover the whole OA.""" oa_geom = box(0, 0, 100, 100) contained_a = box(0, 0, 30, 100) unoccupied = box(70, 0, 90, 100) points = np.array( [ [10, 50], [90, 50], ] ) postcodes = ["A", "B"] fragments = process_oa( oa_geom, points, postcodes, inspire_candidates=[contained_a, unoccupied], ) geoms = [geom for _, geom in fragments] covered = unary_union(geoms) overlap = sum(geom.area for geom in geoms) - covered.area assert covered.area == pytest.approx(oa_geom.area) assert oa_geom.difference(covered).area < 0.01 assert overlap < 0.01 def test_inspire_parcel_straddling_oa_is_clipped(self): """INSPIRE parcels crossing the OA boundary should not leak outside it.""" oa_geom = box(0, 0, 100, 100) straddling = box(80, 0, 140, 100) points = np.array( [ [10, 50], [90, 50], ] ) postcodes = ["A", "B"] fragments = process_oa( oa_geom, points, postcodes, inspire_candidates=[straddling] ) for _, geom in fragments: assert geom.difference(oa_geom).area < 0.01 def test_shared_parcel_keeps_every_contained_postcode(self): """A single parcel containing UPRNs for [A, A, B] must yield a fragment for BOTH A and B. Pre-fix the majority winner (A) claimed the whole parcel, excluding it from `remaining`, so B's UPRNs were trapped inside claimed land and B vanished entirely (no fragment).""" oa_geom = box(0, 0, 100, 100) parcel = box(0, 0, 100, 100) # one parcel covering the whole OA points = np.array( [ [20, 50], # postcode A [30, 50], # postcode A (majority) [80, 50], # postcode B (minority — would be dropped pre-fix) ] ) postcodes = ["A", "A", "B"] fragments = process_oa(oa_geom, points, postcodes, inspire_candidates=[parcel]) frag_dict = dict(fragments) assert "A" in frag_dict, "Majority postcode A must keep a fragment" assert "B" in frag_dict, "Minority postcode B must not be dropped" assert frag_dict["A"].area > 0 assert frag_dict["B"].area > 0 # The split must partition the parcel without overlap. assert frag_dict["A"].intersection(frag_dict["B"]).area < 0.01 # --------------------------------------------------------------------------- # _extract_polygonal helper # --------------------------------------------------------------------------- class TestExtractPolygonal: """_extract_polygonal must strip non-polygon parts from geometry collections.""" def test_polygon_passthrough(self): poly = box(0, 0, 10, 10) assert _extract_polygonal(poly) is poly def test_multipolygon_passthrough(self): mp = MultiPolygon([box(0, 0, 10, 10), box(20, 20, 30, 30)]) assert _extract_polygonal(mp) is mp def test_geometry_collection_extracts_polygon(self): from shapely.geometry import GeometryCollection, LineString poly = box(0, 0, 10, 10) line = LineString([(0, 0), (10, 10)]) gc = GeometryCollection([poly, line]) result = _extract_polygonal(gc) assert result is not None assert result.geom_type == "Polygon" assert result.area == pytest.approx(poly.area) def test_geometry_collection_no_polygons_returns_none(self): from shapely.geometry import GeometryCollection, LineString, Point gc = GeometryCollection([LineString([(0, 0), (1, 1)]), Point(5, 5)]) assert _extract_polygonal(gc) is None def test_line_returns_none(self): from shapely.geometry import LineString assert _extract_polygonal(LineString([(0, 0), (1, 1)])) is None def test_overlapping_collection_unioned_to_valid(self): """A GeometryCollection with OVERLAPPING polygons must be unioned into a VALID geometry (not a raw MultiPolygon, which would be invalid and crash the next .difference()), and must not double-count the overlap area.""" from shapely.geometry import GeometryCollection a = box(0, 0, 100, 100) b = box(50, 50, 150, 150) # overlaps a by 50x50 result = _extract_polygonal(GeometryCollection([a, b])) assert result is not None assert result.is_valid assert result.area == pytest.approx(unary_union([a, b]).area) # And the formerly-crashing op now works: assert result.difference(box(0, 0, 10, 10)).is_valid # --------------------------------------------------------------------------- # Edge case: merge_fragments handles single-OA postcodes # --------------------------------------------------------------------------- class TestMergeFragments: """merge_fragments must handle edge cases cleanly.""" def test_single_fragment_passthrough(self): """A postcode with one fragment should pass through unchanged.""" poly = box(0, 0, 100, 100) result = merge_fragments([("AA1 1AA", poly)]) assert "AA1 1AA" in result assert result["AA1 1AA"].equals(poly) def test_empty_fragments_excluded(self): """Empty geometries should not appear in output.""" empty = Polygon() result = merge_fragments([("AA1 1AA", empty)]) assert "AA1 1AA" not in result # --------------------------------------------------------------------------- # Edge case: to_wgs84_geojson handles degenerate geometries # --------------------------------------------------------------------------- class TestToWgs84Geojson: """to_wgs84_geojson must handle edge cases.""" def test_empty_geometry_returns_none(self): assert to_wgs84_geojson(Polygon()) is None def test_valid_polygon_returns_geojson(self): # Small square in BNG poly = box(530000, 180000, 530100, 180100) result = to_wgs84_geojson(poly) assert result is not None assert result["type"] == "Polygon" assert len(result["coordinates"]) >= 1 assert len(result["coordinates"][0]) >= 4 # closed ring def test_multipolygon_returns_largest(self): """MultiPolygon input should return only the largest polygon.""" big = box(530000, 180000, 530100, 180100) small = box(530200, 180200, 530210, 180210) mp = MultiPolygon([big, small]) result = to_wgs84_geojson(mp) assert result is not None assert result["type"] == "Polygon" def test_coordinates_have_limited_precision(self): """GeoJSON coordinates should be rounded to 6 decimal places.""" import json poly = box(530000, 180000, 530100, 180100) result = to_wgs84_geojson(poly) assert result is not None # Check precision via JSON serialization (what actually hits disk) for lon, lat in result["coordinates"][0]: lon_s = json.dumps(lon) lat_s = json.dumps(lat) lon_dp = len(lon_s.split(".")[1]) if "." in lon_s else 0 lat_dp = len(lat_s.split(".")[1]) if "." in lat_s else 0 assert lon_dp <= 6, f"Longitude {lon_s} has {lon_dp} decimal places" assert lat_dp <= 6, f"Latitude {lat_s} has {lat_dp} decimal places" def test_write_district_geojson_replaces_stale_units(self, tmp_path): stale_units = tmp_path / "units" stale_units.mkdir() (stale_units / "ZZ1.geojson").write_text( json.dumps({"type": "FeatureCollection", "features": []}) ) file_count = write_district_geojson( {"AA1 1AA": box(530000, 180000, 530100, 180100)}, tmp_path ) assert file_count == 1 assert not (stale_units / "ZZ1.geojson").exists() written = json.loads((stale_units / "AA1.geojson").read_text()) assert written["features"][0]["properties"]["postcodes"] == "AA1 1AA" # --------------------------------------------------------------------------- # Edge case: parse_gpkg_geometry rejects unknown envelope types # --------------------------------------------------------------------------- class TestParseGpkgGeometry: """parse_gpkg_geometry must raise on unknown envelope types.""" def test_unknown_envelope_type_raises(self): # Build a minimal GeoPackage blob with envelope_type=5 # Byte 3 = flags: envelope_type in bits 3-1, so type 5 = 0b00001010 blob = bytes([0x47, 0x50, 0x00, 0b00001010]) + b"\x00" * 100 with pytest.raises(ValueError, match="Unknown GeoPackage envelope type 5"): parse_gpkg_geometry(blob) # --------------------------------------------------------------------------- # _fill_holes removes interior rings # --------------------------------------------------------------------------- class TestFillHoles: """_fill_holes fills small artifact holes but keeps large (real-enclosed) ones.""" def test_small_artifact_hole_filled(self): """A small (<1000 m²) interior ring is an artifact and gets filled.""" outer = [(0, 0), (100, 0), (100, 100), (0, 100), (0, 0)] hole = [(40, 40), (60, 40), (60, 60), (40, 60), (40, 40)] # 20x20 = 400 m² poly_with_hole = Polygon(outer, [hole]) assert len(list(poly_with_hole.interiors)) == 1 result = _fill_holes(poly_with_hole) assert result.geom_type == "Polygon" assert len(list(result.interiors)) == 0 assert result.area == pytest.approx(Polygon(outer).area) def test_large_hole_kept(self): """A large (>=1000 m²) hole is likely a real enclosed postcode — keep it.""" outer = [(0, 0), (100, 0), (100, 100), (0, 100), (0, 0)] hole = [(20, 20), (80, 20), (80, 80), (20, 80), (20, 20)] # 60x60 = 3600 m² poly_with_hole = Polygon(outer, [hole]) result = _fill_holes(poly_with_hole) assert len(list(result.interiors)) == 1 assert result.area == pytest.approx(10000 - 3600) def test_multipolygon_with_holes(self): """A MultiPolygon where each part has holes should have all holes removed.""" outer1 = [(0, 0), (50, 0), (50, 50), (0, 50), (0, 0)] hole1 = [(10, 10), (20, 10), (20, 20), (10, 20), (10, 10)] outer2 = [(60, 60), (110, 60), (110, 110), (60, 110), (60, 60)] hole2 = [(70, 70), (80, 70), (80, 80), (70, 80), (70, 70)] mp = MultiPolygon([Polygon(outer1, [hole1]), Polygon(outer2, [hole2])]) result = _fill_holes(mp) assert result.geom_type == "MultiPolygon" for p in result.geoms: assert len(list(p.interiors)) == 0 def test_polygon_without_hole_unchanged(self): """A polygon with no holes should pass through unchanged.""" poly = box(0, 0, 100, 100) result = _fill_holes(poly) assert result.area == pytest.approx(poly.area) # --------------------------------------------------------------------------- # Improved merge with 5m buffer closes 3m gaps # --------------------------------------------------------------------------- class TestMergeImprovedBuffer: """The 5m buffer should close gaps that the old 1m buffer could not.""" def test_3m_gap_merged(self): """Two fragments with a 3m gap should merge into a single polygon.""" left = box(0, 0, 50, 100) right = box(53, 0, 100, 100) # 3m gap at x=50..53 result = merge_fragments([("AA1 1AA", left), ("AA1 1AA", right)]) assert "AA1 1AA" in result geom = result["AA1 1AA"] assert geom.geom_type == "Polygon", ( f"Expected single Polygon after merging 3m gap, got {geom.geom_type}" ) def test_holes_removed_after_merge(self): """Interior holes created by merging should be filled.""" # Create a donut-like shape from fragments outer = box(0, 0, 100, 100) inner = box(30, 30, 70, 70) ring = outer.difference(inner) # Add the inner piece as a separate fragment result = merge_fragments([("AA1 1AA", ring), ("AA1 1AA", inner)]) assert "AA1 1AA" in result geom = result["AA1 1AA"] assert len(list(geom.interiors)) == 0, "Merged polygon should have no holes" # --------------------------------------------------------------------------- # subtract_greenspace # --------------------------------------------------------------------------- class TestSubtractGreenspace: """subtract_greenspace must remove park/water area from postcode polygons.""" def test_park_subtracted(self): """A park overlapping a postcode should reduce its area.""" from shapely.strtree import STRtree postcode = box(0, 0, 100, 100) # 10000 sqm park = box(60, 0, 100, 100) # 4000 sqm overlap on the right tree = STRtree([park]) geoms = [park] result = subtract_greenspace(postcode, tree, geoms) # Should have lost ~4000 sqm assert result.area == pytest.approx(6000, rel=0.01) def test_no_greenspace_unchanged(self): """With no overlapping greenspace, the geometry should be unchanged.""" from shapely.strtree import STRtree postcode = box(0, 0, 100, 100) park = box(200, 200, 300, 300) # far away tree = STRtree([park]) geoms = [park] result = subtract_greenspace(postcode, tree, geoms) assert result.area == pytest.approx(postcode.area) def test_full_overlap_preserves_postcode(self): """If greenspace covers the entire postcode, keep the original.""" from shapely.strtree import STRtree postcode = box(0, 0, 100, 100) park = box(-10, -10, 110, 110) # completely covers postcode tree = STRtree([park]) geoms = [park] result = subtract_greenspace(postcode, tree, geoms) # Should keep original since subtraction would erase entirely assert result.area == pytest.approx(postcode.area) def test_over_90pct_removal_preserves_postcode(self): """If greenspace would remove >90% of area, keep the original.""" from shapely.strtree import STRtree postcode = box(0, 0, 100, 100) # 10000 sqm park = box(5, 0, 100, 100) # 9500 sqm overlap = 95% removal tree = STRtree([park]) geoms = [park] result = subtract_greenspace(postcode, tree, geoms) # Should keep original since >90% would be removed assert result.area == pytest.approx(postcode.area) def test_under_90pct_removal_subtracts(self): """If greenspace removes <90%, subtraction should proceed.""" from shapely.strtree import STRtree postcode = box(0, 0, 100, 100) # 10000 sqm park = box(20, 0, 100, 100) # 8000 sqm overlap = 80% removal tree = STRtree([park]) geoms = [park] result = subtract_greenspace(postcode, tree, geoms) # 80% < 90% cap, so subtraction should happen assert result.area == pytest.approx(2000, rel=0.01) class TestToWgs84GeojsonValidity: """to_wgs84_geojson must emit GeoJSON that round-trips to a valid geometry.""" def test_geojson_round_trips_to_valid_geometry(self): from shapely.geometry import shape geojson = to_wgs84_geojson(box(530000, 180000, 530100, 180100)) assert geojson is not None rt = shape(geojson) assert not rt.is_empty assert rt.is_valid def test_written_district_features_are_all_valid(self, tmp_path): from shapely.geometry import shape postcodes = { "AA1 1AA": box(530000, 180000, 530100, 180100), "AA1 1AB": MultiPolygon( [ box(530200, 180000, 530250, 180050), box(530200, 180060, 530250, 180110), ] ), } assert write_district_geojson(postcodes, tmp_path) == 1 collection = json.loads((tmp_path / "units" / "AA1.geojson").read_text()) for feature in collection["features"]: geom = shape(feature["geometry"]) assert geom.is_valid assert not geom.is_empty class TestGreenspaceHolePreserved: """Interior holes carved by greenspace subtraction must survive merge_fragments (the post-subtraction _fill_holes that previously negated them was removed).""" def test_interior_lake_hole_survives_merge_fragments(self): from shapely.strtree import STRtree postcode = box(0, 0, 100, 100) # 10000 sqm lake = box(30, 30, 70, 70) # 1600 sqm fully-interior hole (16% removal) result = merge_fragments( [("TEST1", postcode)], greenspace_tree=STRtree([lake]), greenspace_geoms=[lake], ) merged = result["TEST1"] assert len(list(merged.interiors)) == 1 assert merged.area == pytest.approx(10000 - 1600, rel=0.05) # --------------------------------------------------------------------------- # merge_fragments keeps substantial detached parts (no OA-seam coverage gaps) # --------------------------------------------------------------------------- class TestKeepDetachedParts: """A postcode split across an OA seam (railway/river) must keep both parts instead of dropping all but the largest, which left ~1.8% uncovered gaps.""" def test_far_apart_parts_both_kept(self): # Two 50x50m blocks 30m apart — wider than the 10m merge buffer. a = box(0, 0, 50, 50) # 2500 m² b = box(80, 0, 130, 50) # 2500 m², 30m gap geom = merge_fragments([("AA1 1AA", a), ("AA1 1AA", b)])["AA1 1AA"] assert geom.geom_type == "MultiPolygon" assert len(geom.geoms) == 2 assert geom.area == pytest.approx(5000, rel=0.01) def test_tiny_noise_part_dropped(self): main = box(0, 0, 100, 100) # 10000 m² noise = box(200, 200, 205, 205) # 25 m² < 100 m² threshold geom = merge_fragments([("AA1 1AA", main), ("AA1 1AA", noise)])["AA1 1AA"] assert geom.geom_type == "Polygon" assert geom.area == pytest.approx(10000, rel=0.01) class TestMultiPolygonOutput: """to_wgs84_geojson_multi / the writer must emit MultiPolygon for split postcodes (the Rust server + loader already parse MultiPolygon).""" def test_multipolygon_preserves_all_parts(self): from shapely.geometry import shape mp = MultiPolygon( [ box(530000, 180000, 530100, 180100), box(531000, 180000, 531100, 180100), ] ) gj = to_wgs84_geojson_multi(mp) assert gj["type"] == "MultiPolygon" assert len(gj["coordinates"]) == 2 rt = shape(gj) assert rt.is_valid and not rt.is_empty assert len(rt.geoms) == 2 def test_single_part_stays_polygon(self): gj = to_wgs84_geojson_multi(box(530000, 180000, 530100, 180100)) assert gj["type"] == "Polygon" def test_writer_emits_multipolygon_feature(self, tmp_path): mp = MultiPolygon( [ box(530000, 180000, 530100, 180100), box(531000, 180000, 531100, 180100), ] ) assert write_district_geojson({"AA1 1AA": mp}, tmp_path) == 1 coll = json.loads((tmp_path / "units" / "AA1.geojson").read_text()) assert coll["features"][0]["geometry"]["type"] == "MultiPolygon" class TestOutputPartition: """The writer must emit a partition: overlapping postcodes are made disjoint (no two cover the same ground) without dropping an active postcode.""" def test_overlapping_postcodes_made_disjoint(self, tmp_path): from shapely.geometry import shape a = box(530000, 180000, 530100, 180100) b = box(530090, 180000, 530200, 180100) # overlaps `a` in a 10m strip assert a.intersection(b).area > 0 # precondition: they overlap write_district_geojson({"AA1 1AA": a, "AA1 1AB": b}, tmp_path) coll = json.loads((tmp_path / "units" / "AA1.geojson").read_text()) geoms = { f["properties"]["postcodes"]: shape(f["geometry"]) for f in coll["features"] } assert set(geoms) == {"AA1 1AA", "AA1 1AB"} # neither dropped # Disjoint interiors (share at most an edge). assert geoms["AA1 1AA"].intersection(geoms["AA1 1AB"]).area == pytest.approx( 0.0, abs=1e-12 ) assert all(g.area > 0 for g in geoms.values()) def test_enclosed_postcode_makes_container_a_donut(self, tmp_path): """A postcode fully INSIDE another must stay disjoint: the smaller (inner) keeps its area, the container gets a hole. A plain `overlaps` query misses containment, so this is the regression guard for that fix.""" from shapely.geometry import shape outer = box(530000, 180000, 530300, 180300) # 90,000 m² inner = box(530100, 180100, 530200, 180200) # 10,000 m², fully inside outer assert outer.contains(inner) # precondition write_district_geojson({"AA1 1AA": outer, "AA1 1AB": inner}, tmp_path) coll = json.loads((tmp_path / "units" / "AA1.geojson").read_text()) geoms = { f["properties"]["postcodes"]: shape(f["geometry"]) for f in coll["features"] } assert set(geoms) == {"AA1 1AA", "AA1 1AB"} # neither dropped assert geoms["AA1 1AA"].intersection(geoms["AA1 1AB"]).area == pytest.approx( 0.0, abs=1e-12 ) # Container is now a donut around the enclosed postcode. assert geoms["AA1 1AA"].geom_type == "Polygon" assert len(list(geoms["AA1 1AA"].interiors)) == 1 assert geoms["AA1 1AB"].area > 0 # --------------------------------------------------------------------------- # InspireIndex must return the same candidates as a brute-force bbox scan # --------------------------------------------------------------------------- class TestInspireIndex: """The grid index replaces a per-OA linear scan of all parcel bboxes; it must return an identical candidate set (and order) so Phase 3 output is unchanged.""" @staticmethod def _brute(bboxes, box): e0, n0, e1, n1 = box mask = ( (bboxes[:, 2] >= e0) & (bboxes[:, 0] <= e1) & (bboxes[:, 3] >= n0) & (bboxes[:, 1] <= n1) ) return np.where(mask)[0] def test_matches_brute_force_over_random_queries(self): rng = np.random.default_rng(0) x = rng.uniform(0, 10000, 5000) y = rng.uniform(0, 10000, 5000) w = rng.uniform(1, 60, 5000) # all <= 500m cell → CSR path h = rng.uniform(1, 60, 5000) bboxes = np.column_stack([x, y, x + w, y + h]).astype(np.float64) idx = build_inspire_index(bboxes, None, None, cell_size=500.0) for _ in range(400): cx, cy = rng.uniform(0, 10000), rng.uniform(0, 10000) sz = float(rng.choice([30.0, 200.0, 1000.0, 3000.0])) box = (cx, cy, cx + sz, cy + sz) got = idx.candidate_indices(box) expected = np.sort(self._brute(bboxes, box)) assert np.array_equal(got, expected) def test_oversized_parcel_is_found(self): # A parcel larger than a cell goes to the overflow list, not the grid; # a query deep inside it (away from the small parcels) must still find it. bboxes = np.array( [ [0.0, 0.0, 5000.0, 5000.0], # 5km parcel >> 500m cell [100.0, 100.0, 120.0, 120.0], [4000.0, 4000.0, 4020.0, 4020.0], ] ) idx = build_inspire_index(bboxes, None, None, cell_size=500.0) box = (2000.0, 2000.0, 2050.0, 2050.0) got = idx.candidate_indices(box) assert 0 in got assert np.array_equal(got, np.sort(self._brute(bboxes, box))) def test_no_overlap_returns_empty(self): bboxes = np.array([[0.0, 0.0, 10.0, 10.0], [20.0, 20.0, 30.0, 30.0]]) idx = build_inspire_index(bboxes, None, None, cell_size=500.0) assert len(idx.candidate_indices((100.0, 100.0, 110.0, 110.0))) == 0 # --------------------------------------------------------------------------- # Parallel OA processing must match the sequential result exactly # --------------------------------------------------------------------------- class TestParallelProcessing: """_process_oas across workers must produce the same fragments as workers=1. Uses single-postcode OAs (fast path), so it exercises the chunking + WKB round-trip + fork machinery without needing INSPIRE data.""" @staticmethod def _inputs(n_oas=60): import pyarrow as pa oa_geoms = { f"E{i:08d}": box(i * 100.0, 0.0, i * 100.0 + 50.0, 50.0) for i in range(n_oas) } codes = sorted(oa_geoms) east, north, pcs = [], [], [] offsets = {} pos = 0 for i, code in enumerate(codes): east += [i * 100.0 + 10.0, i * 100.0 + 20.0] north += [10.0, 20.0] pcs += [f"AA{i % 5} {i % 9}AA"] * 2 # one postcode per OA → fast path offsets[code] = (pos, pos + 2) pos += 2 return ( codes, oa_geoms, np.array(east), np.array(north), pa.array(pcs, type=pa.large_string()), offsets, ) @staticmethod def _norm(frags): return sorted((pc, geom.wkb_hex) for pc, geom in frags) def test_parallel_matches_sequential(self): codes, oa, east, north, pcs, offs = self._inputs() seq, s1 = _process_oas(codes, oa, east, north, pcs, offs, None, workers=1) par, s2 = _process_oas(codes, oa, east, north, pcs, offs, None, workers=3) assert len(seq) == len(codes) # one fragment per single-postcode OA assert s1 == s2 == len(codes) assert self._norm(seq) == self._norm(par) def test_oa_failure_is_tagged_with_oa_code(self): """A failure inside per-OA processing must re-raise with the OA code, so a single bad OA is attributable instead of an anonymous worker abort.""" # Missing OA in the geoms dict → KeyError, wrapped with the OA code. with pytest.raises(RuntimeError, match="E00099999"): _oa_fragments("E00099999", {}, None, None, None, {}, None) class TestDegenerateGeometryHandling: """Every active postcode must keep a boundary (validate_outputs is strict), so a sub-grid sliver is fattened rather than dropped. A genuinely empty geometry is skipped without aborting the whole write (the 10h regression).""" # Three near-collinear vertices in BNG: bbox ~28m x 7m but area ~0.04 m², # i.e. AL10 0TU. Without the rescue it snaps to empty at output precision. SLIVER = Polygon( [(523045.34, 209625.56), (523040.47, 209624.33), (523017.0, 209618.42)] ) def test_sliver_is_rescued_to_valid_geometry(self): from shapely.geometry import shape result = to_wgs84_geojson(self.SLIVER) assert result is not None, "sliver must be rescued, not dropped" rt = shape(result) assert not rt.is_empty assert rt.is_valid def test_collinear_zero_area_input_is_rescued(self): """A zero-area collinear 'polygon' (can't be cleaned to a polygon) must still be rescued via the representative-point fallback, not dropped.""" from shapely.geometry import shape degenerate = Polygon( [(523000, 209600), (523010, 209600), (523020, 209600), (523000, 209600)] ) assert degenerate.area == 0.0 result = to_wgs84_geojson(degenerate) assert result is not None, "degenerate input must be rescued, not dropped" rt = shape(result) assert not rt.is_empty assert rt.is_valid def test_sliver_postcode_present_in_output(self, tmp_path): postcodes = { "AA1 1AA": box(530000, 180000, 530100, 180100), "AA1 1AB": self.SLIVER, # must survive } file_count = write_district_geojson(postcodes, tmp_path) assert file_count == 1 collection = json.loads((tmp_path / "units" / "AA1.geojson").read_text()) written = {f["properties"]["postcodes"] for f in collection["features"]} assert written == {"AA1 1AA", "AA1 1AB"} def test_empty_geometry_skipped_not_raised(self, tmp_path): # The last-resort safety net: an unrescuable (empty) geometry is skipped # so one bad postcode can never abort a multi-hour run. postcodes = { "AA1 1AA": box(530000, 180000, 530100, 180100), "AA1 1AB": Polygon(), # genuinely empty } file_count = write_district_geojson(postcodes, tmp_path) assert file_count == 1 collection = json.loads((tmp_path / "units" / "AA1.geojson").read_text()) written = {f["properties"]["postcodes"] for f in collection["features"]} assert written == {"AA1 1AA"} # --------------------------------------------------------------------------- # fragments_cache round-trips Phase 3 output and validates freshness # --------------------------------------------------------------------------- class TestFragmentsCache: """Persisting Phase 3 lets a crashed run resume without the ~10h OA loop.""" def test_round_trip_preserves_postcodes_and_geometry(self, tmp_path): fragments = [ ("AA1 1AA", box(0, 0, 100, 100)), ("AA1 1AB", box(200, 200, 250, 260)), # A postcode spanning multiple OAs appears as repeated entries. ("AA1 1AA", box(100, 0, 150, 100)), ("AA1 1AC", MultiPolygon([box(0, 0, 10, 10), box(20, 20, 30, 30)])), ] cache = tmp_path / "fragments_cache.parquet" save_fragments(cache, fragments) loaded = load_fragments(cache) assert [pc for pc, _ in loaded] == [pc for pc, _ in fragments] for (_, original), (_, restored) in zip(fragments, loaded): assert restored.equals(original) def test_save_is_atomic_no_tmp_left_behind(self, tmp_path): cache = tmp_path / "fragments_cache.parquet" save_fragments(cache, [("AA1 1AA", box(0, 0, 1, 1))]) assert cache.exists() assert not (tmp_path / "fragments_cache.parquet.tmp").exists() def test_missing_cache_is_not_fresh(self, tmp_path): cache = tmp_path / "fragments_cache.parquet" inp = tmp_path / "uprn.parquet" inp.write_text("x") assert fragments_cache_is_fresh(cache, [inp]) is False def test_cache_newer_than_inputs_is_fresh(self, tmp_path): import os inp = tmp_path / "uprn.parquet" inp.write_text("x") cache = tmp_path / "fragments_cache.parquet" cache.write_text("c") os.utime(inp, (1_000, 1_000)) os.utime(cache, (2_000, 2_000)) assert fragments_cache_is_fresh(cache, [inp, None]) is True def test_cache_older_than_any_input_is_stale(self, tmp_path): import os inp = tmp_path / "oa.gpkg" inp.write_text("x") cache = tmp_path / "fragments_cache.parquet" cache.write_text("c") os.utime(cache, (1_000, 1_000)) os.utime(inp, (2_000, 2_000)) # input touched after the cache assert fragments_cache_is_fresh(cache, [inp]) is False def test_missing_input_is_ignored(self, tmp_path): cache = tmp_path / "fragments_cache.parquet" cache.write_text("c") # arcgis is optional/absent — it cannot have invalidated the cache. assert fragments_cache_is_fresh(cache, [tmp_path / "absent.parquet"]) is True