Add postcode boundary calculation
This commit is contained in:
parent
f9bd218a3e
commit
f5e6894c0f
14 changed files with 1384 additions and 717 deletions
|
|
@ -0,0 +1,430 @@
|
|||
"""Tests for the postcode_boundaries module.
|
||||
|
||||
Each test targets a specific bug or edge case identified during code review.
|
||||
"""
|
||||
|
||||
import numpy as np
|
||||
import polars as pl
|
||||
import pytest
|
||||
from shapely.geometry import MultiPolygon, Polygon, box
|
||||
|
||||
from .oa_boundaries import parse_gpkg_geometry
|
||||
from .output import merge_fragments, to_wgs84_geojson
|
||||
from .process_oa import _extract_polygonal, process_oa
|
||||
from .uprn import get_oa_uprns, load_uprns
|
||||
from .voronoi import _equal_split_fallback, compute_voronoi_regions
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Fixtures
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def square_boundary():
|
||||
"""A 100x100m square OA boundary in BNG coords."""
|
||||
return box(500000, 180000, 500100, 180100)
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def uprn_parquet(tmp_path):
|
||||
"""Write a minimal UPRN parquet file with 3 OAs, return its path."""
|
||||
df = pl.DataFrame(
|
||||
{
|
||||
"GRIDGB1E": [500010, 500020, 500030, 500040, 500050, 500060],
|
||||
"GRIDGB1N": [180010, 180020, 180030, 180040, 180050, 180060],
|
||||
"PCDS": ["AA1 1AA", "AA1 1AB", "BB2 2BB", "BB2 2BC", "CC3 3CC", "CC3 3CD"],
|
||||
"OA21CD": [
|
||||
"E00000001",
|
||||
"E00000001",
|
||||
"E00000002",
|
||||
"E00000002",
|
||||
"E00000003",
|
||||
"E00000003",
|
||||
],
|
||||
}
|
||||
)
|
||||
path = tmp_path / "uprn.parquet"
|
||||
df.write_parquet(path)
|
||||
return path
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Bug 1: First OA silently dropped
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
class TestFirstOADropped:
|
||||
"""The UPRN offset computation drops the first OA (alphabetically)."""
|
||||
|
||||
def test_first_oa_present_in_offsets(self, uprn_parquet):
|
||||
"""E00000001 is the first OA after sorting. It must appear in offsets."""
|
||||
df, offsets = load_uprns(uprn_parquet)
|
||||
assert "E00000001" in offsets, (
|
||||
"First OA (E00000001) missing from offsets — shift(1) null comparison bug"
|
||||
)
|
||||
|
||||
def test_all_oas_present(self, uprn_parquet):
|
||||
"""Every OA in the data must have an offset entry."""
|
||||
df, offsets = load_uprns(uprn_parquet)
|
||||
assert set(offsets.keys()) == {"E00000001", "E00000002", "E00000003"}
|
||||
|
||||
def test_first_oa_data_accessible(self, uprn_parquet):
|
||||
"""UPRNs for the first OA must be retrievable via get_oa_uprns."""
|
||||
df, offsets = load_uprns(uprn_parquet)
|
||||
points, postcodes = get_oa_uprns(df, offsets, "E00000001")
|
||||
assert len(postcodes) == 2
|
||||
assert set(postcodes) == {"AA1 1AA", "AA1 1AB"}
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Bug 2: Whitespace-only postcodes slip through
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
class TestWhitespacePostcodes:
|
||||
"""Postcodes that are only whitespace must be filtered out."""
|
||||
|
||||
def test_whitespace_postcodes_excluded(self, tmp_path):
|
||||
"""A PCDS value of ' ' should not survive loading."""
|
||||
df = pl.DataFrame(
|
||||
{
|
||||
"GRIDGB1E": [500010, 500020, 500030],
|
||||
"GRIDGB1N": [180010, 180020, 180030],
|
||||
"PCDS": ["AA1 1AA", " ", "BB2 2BB"],
|
||||
"OA21CD": ["E00000001", "E00000001", "E00000002"],
|
||||
}
|
||||
)
|
||||
path = tmp_path / "uprn.parquet"
|
||||
df.write_parquet(path)
|
||||
|
||||
loaded_df, offsets = load_uprns(path)
|
||||
all_postcodes = loaded_df["PCDS"].to_list()
|
||||
assert "" not in all_postcodes, "Empty string postcode survived strip+filter"
|
||||
assert " " not in all_postcodes, "Whitespace postcode survived filter"
|
||||
|
||||
def test_whitespace_only_oa_excluded(self, tmp_path):
|
||||
"""An OA where all UPRNs have whitespace-only postcodes should not appear."""
|
||||
df = pl.DataFrame(
|
||||
{
|
||||
"GRIDGB1E": [500010, 500020],
|
||||
"GRIDGB1N": [180010, 180020],
|
||||
"PCDS": [" ", " "],
|
||||
"OA21CD": ["E00000099", "E00000099"],
|
||||
}
|
||||
)
|
||||
path = tmp_path / "uprn.parquet"
|
||||
df.write_parquet(path)
|
||||
|
||||
loaded_df, _ = load_uprns(path)
|
||||
assert len(loaded_df) == 0
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Bug 3: Voronoi deduplication is first-seen-wins
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
class TestVoronoiDeduplication:
|
||||
"""Multiple postcodes sharing a coordinate must all receive area."""
|
||||
|
||||
def test_shared_coords_both_postcodes_get_area(self, square_boundary):
|
||||
"""Two postcodes with UPRNs at the same coords: both must get area."""
|
||||
# Two postcodes each have one UPRN at (500050, 180050)
|
||||
# Plus postcode A has one at a different location
|
||||
points = np.array(
|
||||
[
|
||||
[500020, 180050], # postcode A — unique location
|
||||
[500050, 180050], # postcode A — shared location
|
||||
[500050, 180050], # postcode B — shared location (same coords)
|
||||
[500080, 180050], # postcode B — unique location
|
||||
]
|
||||
)
|
||||
postcodes = ["A", "A", "B", "B"]
|
||||
result = compute_voronoi_regions(points, postcodes, square_boundary)
|
||||
assert "A" in result, "Postcode A should have Voronoi area"
|
||||
assert "B" in result, "Postcode B should have Voronoi area"
|
||||
|
||||
def test_all_shared_coords_no_postcode_lost(self, square_boundary):
|
||||
"""When all UPRNs for a postcode share coords with another, it must still get area."""
|
||||
# Postcode B's only UPRN is at the same coords as postcode A's
|
||||
points = np.array(
|
||||
[
|
||||
[500050, 180050], # postcode A
|
||||
[500050, 180050], # postcode B — identical coords
|
||||
]
|
||||
)
|
||||
postcodes = ["A", "B"]
|
||||
result = compute_voronoi_regions(points, postcodes, square_boundary)
|
||||
assert "A" in result, "Postcode A should have area"
|
||||
assert "B" in result, "Postcode B should have area"
|
||||
# Both should get roughly equal area since they're at the same location
|
||||
area_a = result["A"].area
|
||||
area_b = result["B"].area
|
||||
total = area_a + area_b
|
||||
assert area_a / total > 0.2, "Postcode A should have meaningful area"
|
||||
assert area_b / total > 0.2, "Postcode B should have meaningful area"
|
||||
|
||||
def test_int64_coords_jitter_works(self, square_boundary):
|
||||
"""Int64 coords (production dtype) must still jitter correctly."""
|
||||
points = np.array(
|
||||
[[500050, 180050], [500050, 180050]], dtype=np.int64
|
||||
)
|
||||
postcodes = ["A", "B"]
|
||||
result = compute_voronoi_regions(points, postcodes, square_boundary)
|
||||
assert "A" in result, "Postcode A missing with int64 coords"
|
||||
assert "B" in result, "Postcode B missing with int64 coords"
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Bug 4: Voronoi collinear fallback gives everything to first postcode
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
class TestVoronoiCollinear:
|
||||
"""Collinear points (handled by dummy corners) must distribute area fairly."""
|
||||
|
||||
def test_collinear_points_all_postcodes_get_area(self, square_boundary):
|
||||
"""Points along a line — every postcode must get area."""
|
||||
points = np.array(
|
||||
[
|
||||
[500020, 180050],
|
||||
[500040, 180050],
|
||||
[500060, 180050],
|
||||
[500080, 180050],
|
||||
]
|
||||
)
|
||||
postcodes = ["A", "A", "B", "B"]
|
||||
result = compute_voronoi_regions(points, postcodes, square_boundary)
|
||||
assert "A" in result, "Postcode A should have area"
|
||||
assert "B" in result, "Postcode B should have area"
|
||||
|
||||
def test_collinear_points_area_roughly_fair(self, square_boundary):
|
||||
"""With equal numbers of collinear points, area split should be roughly fair."""
|
||||
points = np.array(
|
||||
[
|
||||
[500030, 180050],
|
||||
[500070, 180050],
|
||||
]
|
||||
)
|
||||
postcodes = ["A", "B"]
|
||||
result = compute_voronoi_regions(points, postcodes, square_boundary)
|
||||
assert "A" in result and "B" in result
|
||||
area_a = result["A"].area
|
||||
area_b = result["B"].area
|
||||
ratio = min(area_a, area_b) / max(area_a, area_b)
|
||||
assert ratio > 0.3, f"Area split too unfair: {area_a:.0f} vs {area_b:.0f}"
|
||||
|
||||
|
||||
class TestEqualSplitFallback:
|
||||
"""_equal_split_fallback must give every postcode some area."""
|
||||
|
||||
def test_all_postcodes_get_area(self, square_boundary):
|
||||
result = _equal_split_fallback(["A", "B", "C"], square_boundary)
|
||||
assert set(result.keys()) == {"A", "B", "C"}
|
||||
for pc, geom in result.items():
|
||||
assert geom.area > 0, f"Postcode {pc} got zero area"
|
||||
|
||||
def test_total_area_approximately_matches(self, square_boundary):
|
||||
result = _equal_split_fallback(["A", "B"], square_boundary)
|
||||
total = sum(g.area for g in result.values())
|
||||
assert total == pytest.approx(square_boundary.area, rel=0.01)
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Bug 5: process_oa can produce non-polygon geometries from make_valid
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
class TestProcessOAGeometryTypes:
|
||||
"""process_oa must return only Polygon/MultiPolygon fragments."""
|
||||
|
||||
def test_overlapping_inspire_no_postcode_overlap(self):
|
||||
"""Overlapping INSPIRE parcels assigned to different postcodes must not overlap."""
|
||||
oa_geom = box(500000, 180000, 500100, 180100)
|
||||
# Two overlapping parcels — left half and a wider middle section
|
||||
parcel_left = box(500000, 180000, 500060, 180100)
|
||||
parcel_right = box(500040, 180000, 500100, 180100) # overlaps left by 20m
|
||||
# UPRN in left parcel → postcode A, UPRN in right parcel → postcode B
|
||||
points = np.array(
|
||||
[
|
||||
[500020, 180050], # postcode A — inside left parcel
|
||||
[500080, 180050], # postcode B — inside right parcel
|
||||
]
|
||||
)
|
||||
postcodes = ["A", "B"]
|
||||
fragments = process_oa(
|
||||
oa_geom, points, postcodes, inspire_candidates=[parcel_left, parcel_right]
|
||||
)
|
||||
frag_dict = dict(fragments)
|
||||
assert "A" in frag_dict and "B" in frag_dict
|
||||
# The critical check: no overlap between the two fragments
|
||||
overlap = frag_dict["A"].intersection(frag_dict["B"])
|
||||
assert overlap.area < 0.01, (
|
||||
f"Postcodes A and B overlap by {overlap.area:.1f} sqm"
|
||||
)
|
||||
|
||||
def test_fragments_are_polygonal(self):
|
||||
"""All fragments from process_oa must be Polygon or MultiPolygon."""
|
||||
oa_geom = box(500000, 180000, 500100, 180100)
|
||||
points = np.array(
|
||||
[
|
||||
[500020, 180020],
|
||||
[500080, 180080],
|
||||
]
|
||||
)
|
||||
postcodes = ["A", "B"]
|
||||
fragments = process_oa(oa_geom, points, postcodes, inspire_candidates=[])
|
||||
for pc, geom in fragments:
|
||||
assert geom.geom_type in ("Polygon", "MultiPolygon"), (
|
||||
f"Fragment for {pc} has unexpected type: {geom.geom_type}"
|
||||
)
|
||||
assert not geom.is_empty, f"Fragment for {pc} is empty"
|
||||
|
||||
def test_no_geometry_collection_in_output(self):
|
||||
"""Even with tricky INSPIRE parcels, output should never be GeometryCollection."""
|
||||
oa_geom = box(500000, 180000, 500100, 180100)
|
||||
# Create a thin sliver that make_valid might convert to a line
|
||||
sliver = Polygon(
|
||||
[
|
||||
(500000, 180000),
|
||||
(500100, 180000),
|
||||
(500100, 180000.001),
|
||||
(500000, 180000),
|
||||
]
|
||||
)
|
||||
points = np.array(
|
||||
[
|
||||
[500020, 180020],
|
||||
[500080, 180080],
|
||||
]
|
||||
)
|
||||
postcodes = ["A", "B"]
|
||||
fragments = process_oa(oa_geom, points, postcodes, inspire_candidates=[sliver])
|
||||
for pc, geom in fragments:
|
||||
assert geom.geom_type in ("Polygon", "MultiPolygon"), (
|
||||
f"Fragment for {pc} has type {geom.geom_type}"
|
||||
)
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# _extract_polygonal helper
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
class TestExtractPolygonal:
|
||||
"""_extract_polygonal must strip non-polygon parts from geometry collections."""
|
||||
|
||||
def test_polygon_passthrough(self):
|
||||
poly = box(0, 0, 10, 10)
|
||||
assert _extract_polygonal(poly) is poly
|
||||
|
||||
def test_multipolygon_passthrough(self):
|
||||
mp = MultiPolygon([box(0, 0, 10, 10), box(20, 20, 30, 30)])
|
||||
assert _extract_polygonal(mp) is mp
|
||||
|
||||
def test_geometry_collection_extracts_polygon(self):
|
||||
from shapely.geometry import GeometryCollection, LineString
|
||||
|
||||
poly = box(0, 0, 10, 10)
|
||||
line = LineString([(0, 0), (10, 10)])
|
||||
gc = GeometryCollection([poly, line])
|
||||
result = _extract_polygonal(gc)
|
||||
assert result is not None
|
||||
assert result.geom_type == "Polygon"
|
||||
assert result.area == pytest.approx(poly.area)
|
||||
|
||||
def test_geometry_collection_no_polygons_returns_none(self):
|
||||
from shapely.geometry import GeometryCollection, LineString, Point
|
||||
|
||||
gc = GeometryCollection([LineString([(0, 0), (1, 1)]), Point(5, 5)])
|
||||
assert _extract_polygonal(gc) is None
|
||||
|
||||
def test_line_returns_none(self):
|
||||
from shapely.geometry import LineString
|
||||
|
||||
assert _extract_polygonal(LineString([(0, 0), (1, 1)])) is None
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Edge case: merge_fragments handles single-OA postcodes
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
class TestMergeFragments:
|
||||
"""merge_fragments must handle edge cases cleanly."""
|
||||
|
||||
def test_single_fragment_passthrough(self):
|
||||
"""A postcode with one fragment should pass through unchanged."""
|
||||
poly = box(0, 0, 100, 100)
|
||||
result = merge_fragments([("AA1 1AA", poly)])
|
||||
assert "AA1 1AA" in result
|
||||
assert result["AA1 1AA"].equals(poly)
|
||||
|
||||
def test_empty_fragments_excluded(self):
|
||||
"""Empty geometries should not appear in output."""
|
||||
empty = Polygon()
|
||||
result = merge_fragments([("AA1 1AA", empty)])
|
||||
assert "AA1 1AA" not in result
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Edge case: to_wgs84_geojson handles degenerate geometries
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
class TestToWgs84Geojson:
|
||||
"""to_wgs84_geojson must handle edge cases."""
|
||||
|
||||
def test_empty_geometry_returns_none(self):
|
||||
assert to_wgs84_geojson(Polygon()) is None
|
||||
|
||||
def test_valid_polygon_returns_geojson(self):
|
||||
# Small square in BNG
|
||||
poly = box(530000, 180000, 530100, 180100)
|
||||
result = to_wgs84_geojson(poly)
|
||||
assert result is not None
|
||||
assert result["type"] == "Polygon"
|
||||
assert len(result["coordinates"]) >= 1
|
||||
assert len(result["coordinates"][0]) >= 4 # closed ring
|
||||
|
||||
def test_multipolygon_returns_largest(self):
|
||||
"""MultiPolygon input should return only the largest polygon."""
|
||||
big = box(530000, 180000, 530100, 180100)
|
||||
small = box(530200, 180200, 530210, 180210)
|
||||
mp = MultiPolygon([big, small])
|
||||
result = to_wgs84_geojson(mp)
|
||||
assert result is not None
|
||||
assert result["type"] == "Polygon"
|
||||
|
||||
def test_coordinates_have_limited_precision(self):
|
||||
"""GeoJSON coordinates should be rounded to 6 decimal places."""
|
||||
import json
|
||||
|
||||
poly = box(530000, 180000, 530100, 180100)
|
||||
result = to_wgs84_geojson(poly)
|
||||
assert result is not None
|
||||
# Check precision via JSON serialization (what actually hits disk)
|
||||
for lon, lat in result["coordinates"][0]:
|
||||
lon_s = json.dumps(lon)
|
||||
lat_s = json.dumps(lat)
|
||||
lon_dp = len(lon_s.split(".")[1]) if "." in lon_s else 0
|
||||
lat_dp = len(lat_s.split(".")[1]) if "." in lat_s else 0
|
||||
assert lon_dp <= 6, f"Longitude {lon_s} has {lon_dp} decimal places"
|
||||
assert lat_dp <= 6, f"Latitude {lat_s} has {lat_dp} decimal places"
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Edge case: parse_gpkg_geometry rejects unknown envelope types
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
class TestParseGpkgGeometry:
|
||||
"""parse_gpkg_geometry must raise on unknown envelope types."""
|
||||
|
||||
def test_unknown_envelope_type_raises(self):
|
||||
# Build a minimal GeoPackage blob with envelope_type=5
|
||||
# Byte 3 = flags: envelope_type in bits 3-1, so type 5 = 0b00001010
|
||||
blob = bytes([0x47, 0x50, 0x00, 0b00001010]) + b"\x00" * 100
|
||||
with pytest.raises(ValueError, match="Unknown GeoPackage envelope type 5"):
|
||||
parse_gpkg_geometry(blob)
|
||||
Loading…
Add table
Add a link
Reference in a new issue