"""Tests for the postcode_boundaries module. Each test targets a specific bug or edge case identified during code review. """ import numpy as np import polars as pl import pytest from shapely.geometry import MultiPolygon, Polygon, box from .oa_boundaries import parse_gpkg_geometry from .greenspace import subtract_greenspace from .output import _fill_holes, merge_fragments, to_wgs84_geojson from .process_oa import _extract_polygonal, process_oa from .uprn import get_oa_uprns, load_uprns from .voronoi import _equal_split_fallback, compute_voronoi_regions # --------------------------------------------------------------------------- # Fixtures # --------------------------------------------------------------------------- @pytest.fixture def square_boundary(): """A 100x100m square OA boundary in BNG coords.""" return box(500000, 180000, 500100, 180100) @pytest.fixture def uprn_parquet(tmp_path): """Write a minimal UPRN parquet file with 3 OAs, return its path.""" df = pl.DataFrame( { "GRIDGB1E": [500010, 500020, 500030, 500040, 500050, 500060], "GRIDGB1N": [180010, 180020, 180030, 180040, 180050, 180060], "PCDS": ["AA1 1AA", "AA1 1AB", "BB2 2BB", "BB2 2BC", "CC3 3CC", "CC3 3CD"], "OA21CD": [ "E00000001", "E00000001", "E00000002", "E00000002", "E00000003", "E00000003", ], } ) path = tmp_path / "uprn.parquet" df.write_parquet(path) return path # --------------------------------------------------------------------------- # Bug 1: First OA silently dropped # --------------------------------------------------------------------------- class TestFirstOADropped: """The UPRN offset computation drops the first OA (alphabetically).""" def test_first_oa_present_in_offsets(self, uprn_parquet): """E00000001 is the first OA after sorting. It must appear in offsets.""" df, offsets = load_uprns(uprn_parquet) assert "E00000001" in offsets, ( "First OA (E00000001) missing from offsets — shift(1) null comparison bug" ) def test_all_oas_present(self, uprn_parquet): """Every OA in the data must have an offset entry.""" df, offsets = load_uprns(uprn_parquet) assert set(offsets.keys()) == {"E00000001", "E00000002", "E00000003"} def test_first_oa_data_accessible(self, uprn_parquet): """UPRNs for the first OA must be retrievable via get_oa_uprns.""" df, offsets = load_uprns(uprn_parquet) points, postcodes = get_oa_uprns(df, offsets, "E00000001") assert len(postcodes) == 2 assert set(postcodes) == {"AA1 1AA", "AA1 1AB"} # --------------------------------------------------------------------------- # Bug 2: Whitespace-only postcodes slip through # --------------------------------------------------------------------------- class TestWhitespacePostcodes: """Postcodes that are only whitespace must be filtered out.""" def test_whitespace_postcodes_excluded(self, tmp_path): """A PCDS value of ' ' should not survive loading.""" df = pl.DataFrame( { "GRIDGB1E": [500010, 500020, 500030], "GRIDGB1N": [180010, 180020, 180030], "PCDS": ["AA1 1AA", " ", "BB2 2BB"], "OA21CD": ["E00000001", "E00000001", "E00000002"], } ) path = tmp_path / "uprn.parquet" df.write_parquet(path) loaded_df, offsets = load_uprns(path) all_postcodes = loaded_df["PCDS"].to_list() assert "" not in all_postcodes, "Empty string postcode survived strip+filter" assert " " not in all_postcodes, "Whitespace postcode survived filter" def test_whitespace_only_oa_excluded(self, tmp_path): """An OA where all UPRNs have whitespace-only postcodes should not appear.""" df = pl.DataFrame( { "GRIDGB1E": [500010, 500020], "GRIDGB1N": [180010, 180020], "PCDS": [" ", " "], "OA21CD": ["E00000099", "E00000099"], } ) path = tmp_path / "uprn.parquet" df.write_parquet(path) loaded_df, _ = load_uprns(path) assert len(loaded_df) == 0 # --------------------------------------------------------------------------- # Bug 3: Voronoi deduplication is first-seen-wins # --------------------------------------------------------------------------- class TestVoronoiDeduplication: """Multiple postcodes sharing a coordinate must all receive area.""" def test_shared_coords_both_postcodes_get_area(self, square_boundary): """Two postcodes with UPRNs at the same coords: both must get area.""" # Two postcodes each have one UPRN at (500050, 180050) # Plus postcode A has one at a different location points = np.array( [ [500020, 180050], # postcode A — unique location [500050, 180050], # postcode A — shared location [500050, 180050], # postcode B — shared location (same coords) [500080, 180050], # postcode B — unique location ] ) postcodes = ["A", "A", "B", "B"] result = compute_voronoi_regions(points, postcodes, square_boundary) assert "A" in result, "Postcode A should have Voronoi area" assert "B" in result, "Postcode B should have Voronoi area" def test_all_shared_coords_no_postcode_lost(self, square_boundary): """When all UPRNs for a postcode share coords with another, it must still get area.""" # Postcode B's only UPRN is at the same coords as postcode A's points = np.array( [ [500050, 180050], # postcode A [500050, 180050], # postcode B — identical coords ] ) postcodes = ["A", "B"] result = compute_voronoi_regions(points, postcodes, square_boundary) assert "A" in result, "Postcode A should have area" assert "B" in result, "Postcode B should have area" # Both should get roughly equal area since they're at the same location area_a = result["A"].area area_b = result["B"].area total = area_a + area_b assert area_a / total > 0.2, "Postcode A should have meaningful area" assert area_b / total > 0.2, "Postcode B should have meaningful area" def test_int64_coords_jitter_works(self, square_boundary): """Int64 coords (production dtype) must still jitter correctly.""" points = np.array([[500050, 180050], [500050, 180050]], dtype=np.int64) postcodes = ["A", "B"] result = compute_voronoi_regions(points, postcodes, square_boundary) assert "A" in result, "Postcode A missing with int64 coords" assert "B" in result, "Postcode B missing with int64 coords" # --------------------------------------------------------------------------- # Bug 4: Voronoi collinear fallback gives everything to first postcode # --------------------------------------------------------------------------- class TestVoronoiCollinear: """Collinear points (handled by dummy corners) must distribute area fairly.""" def test_collinear_points_all_postcodes_get_area(self, square_boundary): """Points along a line — every postcode must get area.""" points = np.array( [ [500020, 180050], [500040, 180050], [500060, 180050], [500080, 180050], ] ) postcodes = ["A", "A", "B", "B"] result = compute_voronoi_regions(points, postcodes, square_boundary) assert "A" in result, "Postcode A should have area" assert "B" in result, "Postcode B should have area" def test_collinear_points_area_roughly_fair(self, square_boundary): """With equal numbers of collinear points, area split should be roughly fair.""" points = np.array( [ [500030, 180050], [500070, 180050], ] ) postcodes = ["A", "B"] result = compute_voronoi_regions(points, postcodes, square_boundary) assert "A" in result and "B" in result area_a = result["A"].area area_b = result["B"].area ratio = min(area_a, area_b) / max(area_a, area_b) assert ratio > 0.3, f"Area split too unfair: {area_a:.0f} vs {area_b:.0f}" class TestEqualSplitFallback: """_equal_split_fallback must give every postcode some area.""" def test_all_postcodes_get_area(self, square_boundary): result = _equal_split_fallback(["A", "B", "C"], square_boundary) assert set(result.keys()) == {"A", "B", "C"} for pc, geom in result.items(): assert geom.area > 0, f"Postcode {pc} got zero area" def test_total_area_approximately_matches(self, square_boundary): result = _equal_split_fallback(["A", "B"], square_boundary) total = sum(g.area for g in result.values()) assert total == pytest.approx(square_boundary.area, rel=0.01) # --------------------------------------------------------------------------- # Bug 5: process_oa can produce non-polygon geometries from make_valid # --------------------------------------------------------------------------- class TestProcessOAGeometryTypes: """process_oa must return only Polygon/MultiPolygon fragments.""" def test_overlapping_inspire_no_postcode_overlap(self): """Overlapping INSPIRE parcels assigned to different postcodes must not overlap.""" oa_geom = box(500000, 180000, 500100, 180100) # Two overlapping parcels — left half and a wider middle section parcel_left = box(500000, 180000, 500060, 180100) parcel_right = box(500040, 180000, 500100, 180100) # overlaps left by 20m # UPRN in left parcel → postcode A, UPRN in right parcel → postcode B points = np.array( [ [500020, 180050], # postcode A — inside left parcel [500080, 180050], # postcode B — inside right parcel ] ) postcodes = ["A", "B"] fragments = process_oa( oa_geom, points, postcodes, inspire_candidates=[parcel_left, parcel_right] ) frag_dict = dict(fragments) assert "A" in frag_dict and "B" in frag_dict # The critical check: no overlap between the two fragments overlap = frag_dict["A"].intersection(frag_dict["B"]) assert overlap.area < 0.01, ( f"Postcodes A and B overlap by {overlap.area:.1f} sqm" ) def test_fragments_are_polygonal(self): """All fragments from process_oa must be Polygon or MultiPolygon.""" oa_geom = box(500000, 180000, 500100, 180100) points = np.array( [ [500020, 180020], [500080, 180080], ] ) postcodes = ["A", "B"] fragments = process_oa(oa_geom, points, postcodes, inspire_candidates=[]) for pc, geom in fragments: assert geom.geom_type in ("Polygon", "MultiPolygon"), ( f"Fragment for {pc} has unexpected type: {geom.geom_type}" ) assert not geom.is_empty, f"Fragment for {pc} is empty" def test_no_geometry_collection_in_output(self): """Even with tricky INSPIRE parcels, output should never be GeometryCollection.""" oa_geom = box(500000, 180000, 500100, 180100) # Create a thin sliver that make_valid might convert to a line sliver = Polygon( [ (500000, 180000), (500100, 180000), (500100, 180000.001), (500000, 180000), ] ) points = np.array( [ [500020, 180020], [500080, 180080], ] ) postcodes = ["A", "B"] fragments = process_oa(oa_geom, points, postcodes, inspire_candidates=[sliver]) for pc, geom in fragments: assert geom.geom_type in ("Polygon", "MultiPolygon"), ( f"Fragment for {pc} has type {geom.geom_type}" ) # --------------------------------------------------------------------------- # _extract_polygonal helper # --------------------------------------------------------------------------- class TestExtractPolygonal: """_extract_polygonal must strip non-polygon parts from geometry collections.""" def test_polygon_passthrough(self): poly = box(0, 0, 10, 10) assert _extract_polygonal(poly) is poly def test_multipolygon_passthrough(self): mp = MultiPolygon([box(0, 0, 10, 10), box(20, 20, 30, 30)]) assert _extract_polygonal(mp) is mp def test_geometry_collection_extracts_polygon(self): from shapely.geometry import GeometryCollection, LineString poly = box(0, 0, 10, 10) line = LineString([(0, 0), (10, 10)]) gc = GeometryCollection([poly, line]) result = _extract_polygonal(gc) assert result is not None assert result.geom_type == "Polygon" assert result.area == pytest.approx(poly.area) def test_geometry_collection_no_polygons_returns_none(self): from shapely.geometry import GeometryCollection, LineString, Point gc = GeometryCollection([LineString([(0, 0), (1, 1)]), Point(5, 5)]) assert _extract_polygonal(gc) is None def test_line_returns_none(self): from shapely.geometry import LineString assert _extract_polygonal(LineString([(0, 0), (1, 1)])) is None # --------------------------------------------------------------------------- # Edge case: merge_fragments handles single-OA postcodes # --------------------------------------------------------------------------- class TestMergeFragments: """merge_fragments must handle edge cases cleanly.""" def test_single_fragment_passthrough(self): """A postcode with one fragment should pass through unchanged.""" poly = box(0, 0, 100, 100) result = merge_fragments([("AA1 1AA", poly)]) assert "AA1 1AA" in result assert result["AA1 1AA"].equals(poly) def test_empty_fragments_excluded(self): """Empty geometries should not appear in output.""" empty = Polygon() result = merge_fragments([("AA1 1AA", empty)]) assert "AA1 1AA" not in result # --------------------------------------------------------------------------- # Edge case: to_wgs84_geojson handles degenerate geometries # --------------------------------------------------------------------------- class TestToWgs84Geojson: """to_wgs84_geojson must handle edge cases.""" def test_empty_geometry_returns_none(self): assert to_wgs84_geojson(Polygon()) is None def test_valid_polygon_returns_geojson(self): # Small square in BNG poly = box(530000, 180000, 530100, 180100) result = to_wgs84_geojson(poly) assert result is not None assert result["type"] == "Polygon" assert len(result["coordinates"]) >= 1 assert len(result["coordinates"][0]) >= 4 # closed ring def test_multipolygon_returns_largest(self): """MultiPolygon input should return only the largest polygon.""" big = box(530000, 180000, 530100, 180100) small = box(530200, 180200, 530210, 180210) mp = MultiPolygon([big, small]) result = to_wgs84_geojson(mp) assert result is not None assert result["type"] == "Polygon" def test_coordinates_have_limited_precision(self): """GeoJSON coordinates should be rounded to 6 decimal places.""" import json poly = box(530000, 180000, 530100, 180100) result = to_wgs84_geojson(poly) assert result is not None # Check precision via JSON serialization (what actually hits disk) for lon, lat in result["coordinates"][0]: lon_s = json.dumps(lon) lat_s = json.dumps(lat) lon_dp = len(lon_s.split(".")[1]) if "." in lon_s else 0 lat_dp = len(lat_s.split(".")[1]) if "." in lat_s else 0 assert lon_dp <= 6, f"Longitude {lon_s} has {lon_dp} decimal places" assert lat_dp <= 6, f"Latitude {lat_s} has {lat_dp} decimal places" # --------------------------------------------------------------------------- # Edge case: parse_gpkg_geometry rejects unknown envelope types # --------------------------------------------------------------------------- class TestParseGpkgGeometry: """parse_gpkg_geometry must raise on unknown envelope types.""" def test_unknown_envelope_type_raises(self): # Build a minimal GeoPackage blob with envelope_type=5 # Byte 3 = flags: envelope_type in bits 3-1, so type 5 = 0b00001010 blob = bytes([0x47, 0x50, 0x00, 0b00001010]) + b"\x00" * 100 with pytest.raises(ValueError, match="Unknown GeoPackage envelope type 5"): parse_gpkg_geometry(blob) # --------------------------------------------------------------------------- # _fill_holes removes interior rings # --------------------------------------------------------------------------- class TestFillHoles: """_fill_holes must remove all interior holes from polygons.""" def test_polygon_with_hole(self): """A polygon with an interior ring should become a solid polygon.""" outer = [(0, 0), (100, 0), (100, 100), (0, 100), (0, 0)] hole = [(30, 30), (70, 30), (70, 70), (30, 70), (30, 30)] poly_with_hole = Polygon(outer, [hole]) assert len(list(poly_with_hole.interiors)) == 1 result = _fill_holes(poly_with_hole) assert result.geom_type == "Polygon" assert len(list(result.interiors)) == 0 assert result.area == pytest.approx(Polygon(outer).area) def test_multipolygon_with_holes(self): """A MultiPolygon where each part has holes should have all holes removed.""" outer1 = [(0, 0), (50, 0), (50, 50), (0, 50), (0, 0)] hole1 = [(10, 10), (20, 10), (20, 20), (10, 20), (10, 10)] outer2 = [(60, 60), (110, 60), (110, 110), (60, 110), (60, 60)] hole2 = [(70, 70), (80, 70), (80, 80), (70, 80), (70, 70)] mp = MultiPolygon([Polygon(outer1, [hole1]), Polygon(outer2, [hole2])]) result = _fill_holes(mp) assert result.geom_type == "MultiPolygon" for p in result.geoms: assert len(list(p.interiors)) == 0 def test_polygon_without_hole_unchanged(self): """A polygon with no holes should pass through unchanged.""" poly = box(0, 0, 100, 100) result = _fill_holes(poly) assert result.area == pytest.approx(poly.area) # --------------------------------------------------------------------------- # Improved merge with 5m buffer closes 3m gaps # --------------------------------------------------------------------------- class TestMergeImprovedBuffer: """The 5m buffer should close gaps that the old 1m buffer could not.""" def test_3m_gap_merged(self): """Two fragments with a 3m gap should merge into a single polygon.""" left = box(0, 0, 50, 100) right = box(53, 0, 100, 100) # 3m gap at x=50..53 result = merge_fragments([("AA1 1AA", left), ("AA1 1AA", right)]) assert "AA1 1AA" in result geom = result["AA1 1AA"] assert geom.geom_type == "Polygon", ( f"Expected single Polygon after merging 3m gap, got {geom.geom_type}" ) def test_holes_removed_after_merge(self): """Interior holes created by merging should be filled.""" # Create a donut-like shape from fragments outer = box(0, 0, 100, 100) inner = box(30, 30, 70, 70) ring = outer.difference(inner) # Add the inner piece as a separate fragment result = merge_fragments([("AA1 1AA", ring), ("AA1 1AA", inner)]) assert "AA1 1AA" in result geom = result["AA1 1AA"] assert len(list(geom.interiors)) == 0, "Merged polygon should have no holes" # --------------------------------------------------------------------------- # subtract_greenspace # --------------------------------------------------------------------------- class TestSubtractGreenspace: """subtract_greenspace must remove park/water area from postcode polygons.""" def test_park_subtracted(self): """A park overlapping a postcode should reduce its area.""" from shapely.strtree import STRtree postcode = box(0, 0, 100, 100) # 10000 sqm park = box(60, 0, 100, 100) # 4000 sqm overlap on the right tree = STRtree([park]) geoms = [park] result = subtract_greenspace(postcode, tree, geoms) # Should have lost ~4000 sqm assert result.area == pytest.approx(6000, rel=0.01) def test_no_greenspace_unchanged(self): """With no overlapping greenspace, the geometry should be unchanged.""" from shapely.strtree import STRtree postcode = box(0, 0, 100, 100) park = box(200, 200, 300, 300) # far away tree = STRtree([park]) geoms = [park] result = subtract_greenspace(postcode, tree, geoms) assert result.area == pytest.approx(postcode.area) def test_full_overlap_preserves_postcode(self): """If greenspace covers the entire postcode, keep the original.""" from shapely.strtree import STRtree postcode = box(0, 0, 100, 100) park = box(-10, -10, 110, 110) # completely covers postcode tree = STRtree([park]) geoms = [park] result = subtract_greenspace(postcode, tree, geoms) # Should keep original since subtraction would erase entirely assert result.area == pytest.approx(postcode.area) def test_over_90pct_removal_preserves_postcode(self): """If greenspace would remove >90% of area, keep the original.""" from shapely.strtree import STRtree postcode = box(0, 0, 100, 100) # 10000 sqm park = box(5, 0, 100, 100) # 9500 sqm overlap = 95% removal tree = STRtree([park]) geoms = [park] result = subtract_greenspace(postcode, tree, geoms) # Should keep original since >90% would be removed assert result.area == pytest.approx(postcode.area) def test_under_90pct_removal_subtracts(self): """If greenspace removes <90%, subtraction should proceed.""" from shapely.strtree import STRtree postcode = box(0, 0, 100, 100) # 10000 sqm park = box(20, 0, 100, 100) # 8000 sqm overlap = 80% removal tree = STRtree([park]) geoms = [park] result = subtract_greenspace(postcode, tree, geoms) # 80% < 90% cap, so subtraction should happen assert result.area == pytest.approx(2000, rel=0.01)