1738 lines
69 KiB
Python
1738 lines
69 KiB
Python
"""Tests for the postcode_boundaries module.
|
|
|
|
Each test targets a specific bug or edge case identified during code review.
|
|
"""
|
|
|
|
import json
|
|
|
|
import numpy as np
|
|
import polars as pl
|
|
import pytest
|
|
from shapely.geometry import MultiPolygon, Polygon, box
|
|
from shapely.ops import unary_union
|
|
|
|
from .fragments_cache import (
|
|
fragments_cache_is_fresh,
|
|
load_fragments,
|
|
save_fragments,
|
|
)
|
|
from .__main__ import _oa_fragments, _process_oas
|
|
from .inspire import build_inspire_index
|
|
from .oa_boundaries import parse_gpkg_geometry
|
|
from .greenspace import subtract_greenspace
|
|
from .output import (
|
|
_fill_holes,
|
|
merge_fragments,
|
|
to_wgs84_geojson,
|
|
to_wgs84_geojson_multi,
|
|
write_district_geojson,
|
|
)
|
|
from .process_oa import MIN_GEOM_AREA, _extract_polygonal, process_oa
|
|
from .uprn import get_oa_uprns, load_uprns
|
|
from .voronoi import _equal_split_fallback, compute_voronoi_regions
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Fixtures
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
@pytest.fixture
|
|
def square_boundary():
|
|
"""A 100x100m square OA boundary in BNG coords."""
|
|
return box(500000, 180000, 500100, 180100)
|
|
|
|
|
|
@pytest.fixture
|
|
def uprn_parquet(tmp_path):
|
|
"""Write a minimal UPRN parquet file with 3 OAs, return its path."""
|
|
df = pl.DataFrame(
|
|
{
|
|
"GRIDGB1E": [500010, 500020, 500030, 500040, 500050, 500060],
|
|
"GRIDGB1N": [180010, 180020, 180030, 180040, 180050, 180060],
|
|
"PCDS": ["AA1 1AA", "AA1 1AB", "BB2 2BB", "BB2 2BC", "CC3 3CC", "CC3 3CD"],
|
|
"OA21CD": [
|
|
"E00000001",
|
|
"E00000001",
|
|
"E00000002",
|
|
"E00000002",
|
|
"E00000003",
|
|
"E00000003",
|
|
],
|
|
}
|
|
)
|
|
path = tmp_path / "uprn.parquet"
|
|
df.write_parquet(path)
|
|
return path
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Bug 1: First OA silently dropped
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
class TestFirstOADropped:
|
|
"""The UPRN offset computation drops the first OA (alphabetically)."""
|
|
|
|
def test_first_oa_present_in_offsets(self, uprn_parquet):
|
|
"""E00000001 is the first OA after sorting. It must appear in offsets."""
|
|
df, offsets = load_uprns(uprn_parquet)
|
|
assert "E00000001" in offsets, (
|
|
"First OA (E00000001) missing from offsets — shift(1) null comparison bug"
|
|
)
|
|
|
|
def test_all_oas_present(self, uprn_parquet):
|
|
"""Every OA in the data must have an offset entry."""
|
|
df, offsets = load_uprns(uprn_parquet)
|
|
assert set(offsets.keys()) == {"E00000001", "E00000002", "E00000003"}
|
|
|
|
def test_first_oa_data_accessible(self, uprn_parquet):
|
|
"""UPRNs for the first OA must be retrievable via get_oa_uprns."""
|
|
df, offsets = load_uprns(uprn_parquet)
|
|
points, postcodes = get_oa_uprns(df, offsets, "E00000001")
|
|
assert len(postcodes) == 2
|
|
assert set(postcodes) == {"AA1 1AA", "AA1 1AB"}
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Bug 2: Whitespace-only postcodes slip through
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
class TestWhitespacePostcodes:
|
|
"""Postcodes that are only whitespace must be filtered out."""
|
|
|
|
def test_whitespace_postcodes_excluded(self, tmp_path):
|
|
"""A PCDS value of ' ' should not survive loading."""
|
|
df = pl.DataFrame(
|
|
{
|
|
"GRIDGB1E": [500010, 500020, 500030],
|
|
"GRIDGB1N": [180010, 180020, 180030],
|
|
"PCDS": ["AA1 1AA", " ", "BB2 2BB"],
|
|
"OA21CD": ["E00000001", "E00000001", "E00000002"],
|
|
}
|
|
)
|
|
path = tmp_path / "uprn.parquet"
|
|
df.write_parquet(path)
|
|
|
|
loaded_df, offsets = load_uprns(path)
|
|
all_postcodes = loaded_df["PCDS"].to_list()
|
|
assert "" not in all_postcodes, "Empty string postcode survived strip+filter"
|
|
assert " " not in all_postcodes, "Whitespace postcode survived filter"
|
|
|
|
def test_whitespace_only_oa_excluded(self, tmp_path):
|
|
"""An OA where all UPRNs have whitespace-only postcodes should not appear."""
|
|
df = pl.DataFrame(
|
|
{
|
|
"GRIDGB1E": [500010, 500020],
|
|
"GRIDGB1N": [180010, 180020],
|
|
"PCDS": [" ", " "],
|
|
"OA21CD": ["E00000099", "E00000099"],
|
|
}
|
|
)
|
|
path = tmp_path / "uprn.parquet"
|
|
df.write_parquet(path)
|
|
|
|
loaded_df, _ = load_uprns(path)
|
|
assert len(loaded_df) == 0
|
|
|
|
def test_non_english_oas_excluded(self, tmp_path):
|
|
df = pl.DataFrame(
|
|
{
|
|
"GRIDGB1E": [500010, 300010],
|
|
"GRIDGB1N": [180010, 220010],
|
|
"PCDS": ["AA1 1AA", "CF1 1AA"],
|
|
"OA21CD": ["E00000001", "W00000001"],
|
|
}
|
|
)
|
|
path = tmp_path / "uprn.parquet"
|
|
df.write_parquet(path)
|
|
|
|
loaded_df, offsets = load_uprns(path)
|
|
|
|
assert set(offsets) == {"E00000001"}
|
|
assert loaded_df["PCDS"].to_list() == ["AA1 1AA"]
|
|
|
|
def test_terminated_postcodes_are_remapped(self, tmp_path):
|
|
uprns = pl.DataFrame(
|
|
{
|
|
"GRIDGB1E": [500010],
|
|
"GRIDGB1N": [180010],
|
|
"PCDS": ["aa1 1aa"],
|
|
"OA21CD": ["E00000001"],
|
|
}
|
|
)
|
|
uprn_path = tmp_path / "uprn.parquet"
|
|
uprns.write_parquet(uprn_path)
|
|
arcgis = pl.DataFrame(
|
|
{
|
|
"pcds": ["AA1 1AA", "AA1 1AB"],
|
|
"east1m": [500010, 500030],
|
|
"north1m": [180010, 180020],
|
|
"oa21cd": ["E00000001", "E00000001"],
|
|
"doterm": ["2020-01-01", None],
|
|
"ctry25cd": ["E92000001", "E92000001"],
|
|
}
|
|
)
|
|
arcgis_path = tmp_path / "arcgis.parquet"
|
|
arcgis.write_parquet(arcgis_path)
|
|
|
|
loaded_df, _offsets = load_uprns(uprn_path, arcgis_path)
|
|
|
|
assert loaded_df["PCDS"].to_list() == ["AA1 1AB"]
|
|
|
|
def test_remapped_terminated_postcode_adopts_successor_oa(self, tmp_path):
|
|
"""When a terminated postcode is remapped to its active successor, the
|
|
remapped seed point must carry the SUCCESSOR's OA (and coords), not the
|
|
terminated postcode's original OA. Pre-fix the row kept OA21CD of the
|
|
terminated postcode, seeding the successor into an OA it doesn't belong
|
|
to and splitting its boundary across OAs."""
|
|
# Terminated AA1 1AA sits in OA E00000001. Its nearest active successor
|
|
# AA1 1AB lives in a DIFFERENT OA (E00000002) far away.
|
|
uprns = pl.DataFrame(
|
|
{
|
|
"GRIDGB1E": [500010],
|
|
"GRIDGB1N": [180010],
|
|
"PCDS": ["AA1 1AA"],
|
|
"OA21CD": ["E00000001"],
|
|
}
|
|
)
|
|
uprn_path = tmp_path / "uprn.parquet"
|
|
uprns.write_parquet(uprn_path)
|
|
arcgis = pl.DataFrame(
|
|
{
|
|
"pcds": ["AA1 1AA", "AA1 1AB"],
|
|
"east1m": [500010, 500030],
|
|
"north1m": [180010, 180020],
|
|
# AA1 1AA terminated → only AA1 1AB is an active successor, and
|
|
# it belongs to a different OA than the terminated postcode.
|
|
"oa21cd": ["E00000001", "E00000002"],
|
|
"doterm": ["2020-01-01", None],
|
|
"ctry25cd": ["E92000001", "E92000001"],
|
|
}
|
|
)
|
|
arcgis_path = tmp_path / "arcgis.parquet"
|
|
arcgis.write_parquet(arcgis_path)
|
|
|
|
loaded_df, offsets = load_uprns(uprn_path, arcgis_path)
|
|
|
|
# The remapped point must be grouped under the successor's OA, not the
|
|
# terminated postcode's OA.
|
|
assert "E00000002" in offsets, "Successor OA missing — remap kept old OA"
|
|
assert "E00000001" not in offsets, (
|
|
"Remapped point still lives in the terminated postcode's OA"
|
|
)
|
|
points, postcodes = get_oa_uprns(loaded_df, offsets, "E00000002")
|
|
assert postcodes == ["AA1 1AB"]
|
|
# It should also adopt the successor's authoritative coordinates.
|
|
assert points.tolist() == [[500030.0, 180020.0]]
|
|
|
|
def test_arcgis_filters_to_active_english_postcodes(self, tmp_path):
|
|
uprns = pl.DataFrame(
|
|
{
|
|
"GRIDGB1E": [500010, 500020],
|
|
"GRIDGB1N": [180010, 180020],
|
|
"PCDS": ["AA1 1AA", "CF1 1AA"],
|
|
"OA21CD": ["E00000001", "E00000001"],
|
|
}
|
|
)
|
|
uprn_path = tmp_path / "uprn.parquet"
|
|
uprns.write_parquet(uprn_path)
|
|
arcgis = pl.DataFrame(
|
|
{
|
|
"pcds": ["AA1 1AA", "CF1 1AA"],
|
|
"east1m": [500010, 300010],
|
|
"north1m": [180010, 220010],
|
|
"oa21cd": ["E00000001", "W00000001"],
|
|
"doterm": [None, None],
|
|
"ctry25cd": ["E92000001", "W92000004"],
|
|
}
|
|
)
|
|
arcgis_path = tmp_path / "arcgis.parquet"
|
|
arcgis.write_parquet(arcgis_path)
|
|
|
|
loaded_df, _offsets = load_uprns(uprn_path, arcgis_path)
|
|
|
|
assert loaded_df["PCDS"].to_list() == ["AA1 1AA"]
|
|
|
|
def test_arcgis_adds_centroid_seed_for_active_postcode_without_uprn(self, tmp_path):
|
|
uprns = pl.DataFrame(
|
|
{
|
|
"GRIDGB1E": [500010],
|
|
"GRIDGB1N": [180010],
|
|
"PCDS": ["AA1 1AA"],
|
|
"OA21CD": ["E00000001"],
|
|
}
|
|
)
|
|
uprn_path = tmp_path / "uprn.parquet"
|
|
uprns.write_parquet(uprn_path)
|
|
arcgis = pl.DataFrame(
|
|
{
|
|
"pcds": ["AA1 1AA", "BB1 1BB"],
|
|
"east1m": [500010, 510000],
|
|
"north1m": [180010, 190000],
|
|
"oa21cd": ["E00000001", "E00000002"],
|
|
"doterm": [None, None],
|
|
"ctry25cd": ["E92000001", "E92000001"],
|
|
}
|
|
)
|
|
arcgis_path = tmp_path / "arcgis.parquet"
|
|
arcgis.write_parquet(arcgis_path)
|
|
|
|
loaded_df, offsets = load_uprns(uprn_path, arcgis_path)
|
|
|
|
assert set(loaded_df["PCDS"].to_list()) == {"AA1 1AA", "BB1 1BB"}
|
|
points, postcodes = get_oa_uprns(loaded_df, offsets, "E00000002")
|
|
assert postcodes == ["BB1 1BB"]
|
|
assert points.tolist() == [[510000.0, 190000.0]]
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Bug 3: Voronoi deduplication is first-seen-wins
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
class TestVoronoiDeduplication:
|
|
"""Multiple postcodes sharing a coordinate must all receive area."""
|
|
|
|
def test_shared_coords_both_postcodes_get_area(self, square_boundary):
|
|
"""Two postcodes with UPRNs at the same coords: both must get area."""
|
|
# Two postcodes each have one UPRN at (500050, 180050)
|
|
# Plus postcode A has one at a different location
|
|
points = np.array(
|
|
[
|
|
[500020, 180050], # postcode A — unique location
|
|
[500050, 180050], # postcode A — shared location
|
|
[500050, 180050], # postcode B — shared location (same coords)
|
|
[500080, 180050], # postcode B — unique location
|
|
]
|
|
)
|
|
postcodes = ["A", "A", "B", "B"]
|
|
result = compute_voronoi_regions(points, postcodes, square_boundary)
|
|
assert "A" in result, "Postcode A should have Voronoi area"
|
|
assert "B" in result, "Postcode B should have Voronoi area"
|
|
|
|
def test_all_shared_coords_no_postcode_lost(self, square_boundary):
|
|
"""When all UPRNs for a postcode share coords with another, it must still get area."""
|
|
# Postcode B's only UPRN is at the same coords as postcode A's
|
|
points = np.array(
|
|
[
|
|
[500050, 180050], # postcode A
|
|
[500050, 180050], # postcode B — identical coords
|
|
]
|
|
)
|
|
postcodes = ["A", "B"]
|
|
result = compute_voronoi_regions(points, postcodes, square_boundary)
|
|
assert "A" in result, "Postcode A should have area"
|
|
assert "B" in result, "Postcode B should have area"
|
|
# Both should get roughly equal area since they're at the same location
|
|
area_a = result["A"].area
|
|
area_b = result["B"].area
|
|
total = area_a + area_b
|
|
assert area_a / total > 0.2, "Postcode A should have meaningful area"
|
|
assert area_b / total > 0.2, "Postcode B should have meaningful area"
|
|
|
|
def test_int64_coords_jitter_works(self, square_boundary):
|
|
"""Int64 coords (production dtype) must still jitter correctly."""
|
|
points = np.array([[500050, 180050], [500050, 180050]], dtype=np.int64)
|
|
postcodes = ["A", "B"]
|
|
result = compute_voronoi_regions(points, postcodes, square_boundary)
|
|
assert "A" in result, "Postcode A missing with int64 coords"
|
|
assert "B" in result, "Postcode B missing with int64 coords"
|
|
|
|
|
|
class TestVoronoiCoincidentClusterNotCrushed:
|
|
"""3+ postcodes at one coordinate must each keep a real cell.
|
|
|
|
Pre-fix, the first coincident postcode stayed unjittered at the exact
|
|
cluster centre; with other seeds in the OA its Voronoi cell was squeezed
|
|
below MIN_GEOM_AREA, so _clean_polygonal dropped that active postcode
|
|
downstream. The fix spreads coincident postcodes onto a small regular
|
|
polygon (equal wedges), so none is crushed.
|
|
"""
|
|
|
|
def test_coincident_cluster_plus_outer_seed_no_postcode_crushed(self):
|
|
# A block of flats: 4 distinct postcodes share one building coordinate,
|
|
# plus one other postcode elsewhere in the OA. Pre-fix, the centre seed's
|
|
# cell collapsed to ~0.0001 m^2 (< MIN_GEOM_AREA) and the postcode was
|
|
# dropped; every postcode must now keep a non-degenerate cell.
|
|
boundary = box(0, 0, 1000, 1000)
|
|
points = np.array(
|
|
[
|
|
[500, 500], # A — coincident
|
|
[500, 500], # B — coincident
|
|
[500, 500], # C — coincident
|
|
[500, 500], # D — coincident
|
|
[100, 100], # OUT — elsewhere in the OA
|
|
],
|
|
dtype=np.float64,
|
|
)
|
|
postcodes = ["A", "B", "C", "D", "OUT"]
|
|
result = compute_voronoi_regions(points, postcodes, boundary)
|
|
for pc in postcodes:
|
|
assert pc in result, f"Postcode {pc} was dropped"
|
|
assert result[pc].area > MIN_GEOM_AREA, (
|
|
f"Postcode {pc} cell {result[pc].area} <= MIN_GEOM_AREA"
|
|
)
|
|
|
|
def test_coincident_cluster_partitions_into_fair_wedges(self, square_boundary):
|
|
# N postcodes sharing one coordinate split the surrounding area into
|
|
# roughly equal wedges (regular-polygon seeds), none degenerate.
|
|
points = np.array([[500050, 180050]] * 5, dtype=np.float64)
|
|
postcodes = ["A", "B", "C", "D", "E"]
|
|
result = compute_voronoi_regions(points, postcodes, square_boundary)
|
|
fair_share = square_boundary.area / len(postcodes)
|
|
for pc in postcodes:
|
|
assert pc in result, f"Postcode {pc} was dropped"
|
|
# Each wedge is a meaningful fraction of its fair share (not crushed).
|
|
assert result[pc].area > 0.3 * fair_share, (
|
|
f"Postcode {pc} cell {result[pc].area} far below fair share {fair_share}"
|
|
)
|
|
|
|
def test_two_coincident_split_is_fair(self, square_boundary):
|
|
"""Regression: two postcodes at one coordinate split ~50/50."""
|
|
points = np.array([[500050, 180050], [500050, 180050]], dtype=np.float64)
|
|
postcodes = ["A", "B"]
|
|
result = compute_voronoi_regions(points, postcodes, square_boundary)
|
|
assert "A" in result and "B" in result
|
|
total = result["A"].area + result["B"].area
|
|
assert result["A"].area / total > 0.4
|
|
assert result["B"].area / total > 0.4
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Bug 4: Voronoi collinear fallback gives everything to first postcode
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
class TestVoronoiCollinear:
|
|
"""Collinear points (handled by dummy corners) must distribute area fairly."""
|
|
|
|
def test_collinear_points_all_postcodes_get_area(self, square_boundary):
|
|
"""Points along a line — every postcode must get area."""
|
|
points = np.array(
|
|
[
|
|
[500020, 180050],
|
|
[500040, 180050],
|
|
[500060, 180050],
|
|
[500080, 180050],
|
|
]
|
|
)
|
|
postcodes = ["A", "A", "B", "B"]
|
|
result = compute_voronoi_regions(points, postcodes, square_boundary)
|
|
assert "A" in result, "Postcode A should have area"
|
|
assert "B" in result, "Postcode B should have area"
|
|
|
|
def test_collinear_points_area_roughly_fair(self, square_boundary):
|
|
"""With equal numbers of collinear points, area split should be roughly fair."""
|
|
points = np.array(
|
|
[
|
|
[500030, 180050],
|
|
[500070, 180050],
|
|
]
|
|
)
|
|
postcodes = ["A", "B"]
|
|
result = compute_voronoi_regions(points, postcodes, square_boundary)
|
|
assert "A" in result and "B" in result
|
|
area_a = result["A"].area
|
|
area_b = result["B"].area
|
|
ratio = min(area_a, area_b) / max(area_a, area_b)
|
|
assert ratio > 0.3, f"Area split too unfair: {area_a:.0f} vs {area_b:.0f}"
|
|
|
|
|
|
class TestVoronoiCoverage:
|
|
"""Voronoi fallback should cover large OAs even when UPRNs are clustered."""
|
|
|
|
def test_clustered_points_cover_large_boundary(self):
|
|
boundary = box(0, 0, 5000, 100)
|
|
points = np.array([[10, 50], [20, 50]])
|
|
result = compute_voronoi_regions(points, ["A", "B"], boundary)
|
|
|
|
covered = unary_union(list(result.values()))
|
|
|
|
assert covered.area == pytest.approx(boundary.area)
|
|
assert boundary.difference(covered).area < 0.01
|
|
|
|
|
|
class TestEqualSplitFallback:
|
|
"""_equal_split_fallback must give every postcode some area."""
|
|
|
|
def test_all_postcodes_get_area(self, square_boundary):
|
|
result = _equal_split_fallback(["A", "B", "C"], square_boundary)
|
|
assert set(result.keys()) == {"A", "B", "C"}
|
|
for pc, geom in result.items():
|
|
assert geom.area > 0, f"Postcode {pc} got zero area"
|
|
|
|
def test_total_area_approximately_matches(self, square_boundary):
|
|
result = _equal_split_fallback(["A", "B"], square_boundary)
|
|
total = sum(g.area for g in result.values())
|
|
assert total == pytest.approx(square_boundary.area, rel=0.01)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Bug 5: process_oa can produce non-polygon geometries from make_valid
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
class TestProcessOAGeometryTypes:
|
|
"""process_oa must return only Polygon/MultiPolygon fragments."""
|
|
|
|
def test_overlapping_inspire_no_postcode_overlap(self):
|
|
"""Overlapping INSPIRE parcels assigned to different postcodes must not overlap."""
|
|
oa_geom = box(500000, 180000, 500100, 180100)
|
|
# Two overlapping parcels — left half and a wider middle section
|
|
parcel_left = box(500000, 180000, 500060, 180100)
|
|
parcel_right = box(500040, 180000, 500100, 180100) # overlaps left by 20m
|
|
# UPRN in left parcel → postcode A, UPRN in right parcel → postcode B
|
|
points = np.array(
|
|
[
|
|
[500020, 180050], # postcode A — inside left parcel
|
|
[500080, 180050], # postcode B — inside right parcel
|
|
]
|
|
)
|
|
postcodes = ["A", "B"]
|
|
fragments = process_oa(
|
|
oa_geom, points, postcodes, inspire_candidates=[parcel_left, parcel_right]
|
|
)
|
|
frag_dict = dict(fragments)
|
|
assert "A" in frag_dict and "B" in frag_dict
|
|
# The critical check: no overlap between the two fragments
|
|
overlap = frag_dict["A"].intersection(frag_dict["B"])
|
|
assert overlap.area < 0.01, (
|
|
f"Postcodes A and B overlap by {overlap.area:.1f} sqm"
|
|
)
|
|
|
|
def test_fragments_are_polygonal(self):
|
|
"""All fragments from process_oa must be Polygon or MultiPolygon."""
|
|
oa_geom = box(500000, 180000, 500100, 180100)
|
|
points = np.array(
|
|
[
|
|
[500020, 180020],
|
|
[500080, 180080],
|
|
]
|
|
)
|
|
postcodes = ["A", "B"]
|
|
fragments = process_oa(oa_geom, points, postcodes, inspire_candidates=[])
|
|
for pc, geom in fragments:
|
|
assert geom.geom_type in ("Polygon", "MultiPolygon"), (
|
|
f"Fragment for {pc} has unexpected type: {geom.geom_type}"
|
|
)
|
|
assert not geom.is_empty, f"Fragment for {pc} is empty"
|
|
|
|
def test_no_geometry_collection_in_output(self):
|
|
"""Even with tricky INSPIRE parcels, output should never be GeometryCollection."""
|
|
oa_geom = box(500000, 180000, 500100, 180100)
|
|
# Create a thin sliver that make_valid might convert to a line
|
|
sliver = Polygon(
|
|
[
|
|
(500000, 180000),
|
|
(500100, 180000),
|
|
(500100, 180000.001),
|
|
(500000, 180000),
|
|
]
|
|
)
|
|
points = np.array(
|
|
[
|
|
[500020, 180020],
|
|
[500080, 180080],
|
|
]
|
|
)
|
|
postcodes = ["A", "B"]
|
|
fragments = process_oa(oa_geom, points, postcodes, inspire_candidates=[sliver])
|
|
for pc, geom in fragments:
|
|
assert geom.geom_type in ("Polygon", "MultiPolygon"), (
|
|
f"Fragment for {pc} has type {geom.geom_type}"
|
|
)
|
|
|
|
|
|
class TestProcessOAInspireParcelAssignment:
|
|
"""INSPIRE parcels without UPRNs should still shape postcode boundaries."""
|
|
|
|
def test_unoccupied_inspire_parcel_goes_to_nearest_postcode(self):
|
|
"""A parcel with no contained UPRN should not be split by Voronoi."""
|
|
oa_geom = box(0, 0, 100, 100)
|
|
parcel = box(20, 40, 65, 60) # crosses the x=50 Voronoi split
|
|
points = np.array(
|
|
[
|
|
[10, 50], # postcode A
|
|
[90, 50], # postcode B
|
|
]
|
|
)
|
|
postcodes = ["A", "B"]
|
|
|
|
fragments = process_oa(oa_geom, points, postcodes, inspire_candidates=[parcel])
|
|
frag_dict = dict(fragments)
|
|
|
|
assert "A" in frag_dict and "B" in frag_dict
|
|
assert parcel.difference(frag_dict["A"]).area < 0.01
|
|
assert frag_dict["B"].intersection(parcel).area < 0.01
|
|
|
|
def test_contained_uprn_claim_wins_over_overlapping_nearest_parcel(self):
|
|
"""Contained-UPRN parcel claims should keep priority over nearest claims."""
|
|
oa_geom = box(0, 0, 100, 100)
|
|
contained_a = box(0, 0, 60, 100)
|
|
unoccupied_nearer_b = box(50, 0, 80, 100)
|
|
points = np.array(
|
|
[
|
|
[20, 50], # postcode A, inside contained_a
|
|
[90, 50], # postcode B, outside unoccupied_nearer_b
|
|
]
|
|
)
|
|
postcodes = ["A", "B"]
|
|
|
|
fragments = process_oa(
|
|
oa_geom,
|
|
points,
|
|
postcodes,
|
|
inspire_candidates=[contained_a, unoccupied_nearer_b],
|
|
)
|
|
frag_dict = dict(fragments)
|
|
|
|
assert "A" in frag_dict and "B" in frag_dict
|
|
assert contained_a.difference(frag_dict["A"]).area < 0.01
|
|
assert frag_dict["A"].intersection(frag_dict["B"]).area < 0.01
|
|
assert frag_dict["B"].intersection(box(60, 0, 80, 100)).area > 0
|
|
|
|
def test_nearest_uses_assignable_fragment_after_contained_subtraction(self):
|
|
"""Nearest assignment should use the part left after priority subtraction."""
|
|
oa_geom = box(0, 0, 100, 100)
|
|
contained_a = box(0, 0, 60, 100)
|
|
unoccupied = box(25, 0, 80, 100)
|
|
points = np.array(
|
|
[
|
|
[20, 50], # postcode A, inside contained_a
|
|
[90, 50], # postcode B, nearest to unoccupied remainder
|
|
]
|
|
)
|
|
postcodes = ["A", "B"]
|
|
|
|
fragments = process_oa(
|
|
oa_geom,
|
|
points,
|
|
postcodes,
|
|
inspire_candidates=[contained_a, unoccupied],
|
|
)
|
|
frag_dict = dict(fragments)
|
|
|
|
assert contained_a.difference(frag_dict["A"]).area < 0.01
|
|
assert box(60, 0, 80, 100).difference(frag_dict["B"]).area < 0.01
|
|
|
|
def test_boundary_uprn_does_not_claim_adjacent_parcel(self):
|
|
"""A UPRN on a parcel edge should not count inside both parcels."""
|
|
oa_geom = box(0, 0, 100, 100)
|
|
left = box(0, 0, 50, 100)
|
|
right = box(50, 0, 100, 100)
|
|
points = np.array(
|
|
[
|
|
[50, 50], # postcode A, exactly on shared parcel boundary
|
|
[75, 50], # postcode B, strictly inside right parcel
|
|
]
|
|
)
|
|
postcodes = ["A", "B"]
|
|
|
|
fragments = process_oa(
|
|
oa_geom, points, postcodes, inspire_candidates=[left, right]
|
|
)
|
|
frag_dict = dict(fragments)
|
|
|
|
assert "A" in frag_dict and "B" in frag_dict
|
|
assert right.difference(frag_dict["B"]).area < 0.01
|
|
|
|
def test_disconnected_nearest_fragments_can_go_to_different_postcodes(self):
|
|
"""A split unoccupied parcel should be assigned component by component."""
|
|
oa_geom = box(0, 0, 100, 100)
|
|
contained_b = box(40, 0, 60, 100)
|
|
unoccupied = box(0, 40, 100, 60)
|
|
points = np.array(
|
|
[
|
|
[10, 20], # postcode A, nearest to left split fragment
|
|
[50, 20], # postcode B, inside contained_b but outside unoccupied
|
|
[90, 20], # postcode C, nearest to right split fragment
|
|
]
|
|
)
|
|
postcodes = ["A", "B", "C"]
|
|
|
|
fragments = process_oa(
|
|
oa_geom,
|
|
points,
|
|
postcodes,
|
|
inspire_candidates=[contained_b, unoccupied],
|
|
)
|
|
frag_dict = dict(fragments)
|
|
|
|
assert box(0, 40, 40, 60).difference(frag_dict["A"]).area < 0.01
|
|
assert box(60, 40, 100, 60).difference(frag_dict["C"]).area < 0.01
|
|
|
|
def test_overlapping_nearest_parcels_do_not_overlap_in_output(self):
|
|
"""Two unoccupied nearest-assigned parcels should be resolved cleanly."""
|
|
oa_geom = box(0, 0, 100, 100)
|
|
left = box(0, 0, 70, 100)
|
|
right = box(30, 0, 100, 100)
|
|
points = np.array(
|
|
[
|
|
[10, 50], # postcode A, nearest to left parcel
|
|
[90, 50], # postcode B, nearest to right parcel
|
|
]
|
|
)
|
|
postcodes = ["A", "B"]
|
|
|
|
fragments = process_oa(
|
|
oa_geom, points, postcodes, inspire_candidates=[left, right]
|
|
)
|
|
frag_dict = dict(fragments)
|
|
|
|
assert "A" in frag_dict and "B" in frag_dict
|
|
assert frag_dict["A"].intersection(frag_dict["B"]).area < 0.01
|
|
|
|
def test_mixed_inspire_and_voronoi_covers_oa_without_overlap(self):
|
|
"""Parcel claims plus Voronoi fallback should cover the whole OA."""
|
|
oa_geom = box(0, 0, 100, 100)
|
|
contained_a = box(0, 0, 30, 100)
|
|
unoccupied = box(70, 0, 90, 100)
|
|
points = np.array(
|
|
[
|
|
[10, 50],
|
|
[90, 50],
|
|
]
|
|
)
|
|
postcodes = ["A", "B"]
|
|
|
|
fragments = process_oa(
|
|
oa_geom,
|
|
points,
|
|
postcodes,
|
|
inspire_candidates=[contained_a, unoccupied],
|
|
)
|
|
geoms = [geom for _, geom in fragments]
|
|
covered = unary_union(geoms)
|
|
overlap = sum(geom.area for geom in geoms) - covered.area
|
|
|
|
assert covered.area == pytest.approx(oa_geom.area)
|
|
assert oa_geom.difference(covered).area < 0.01
|
|
assert overlap < 0.01
|
|
|
|
def test_inspire_parcel_straddling_oa_is_clipped(self):
|
|
"""INSPIRE parcels crossing the OA boundary should not leak outside it."""
|
|
oa_geom = box(0, 0, 100, 100)
|
|
straddling = box(80, 0, 140, 100)
|
|
points = np.array(
|
|
[
|
|
[10, 50],
|
|
[90, 50],
|
|
]
|
|
)
|
|
postcodes = ["A", "B"]
|
|
|
|
fragments = process_oa(
|
|
oa_geom, points, postcodes, inspire_candidates=[straddling]
|
|
)
|
|
|
|
for _, geom in fragments:
|
|
assert geom.difference(oa_geom).area < 0.01
|
|
|
|
def test_shared_parcel_keeps_every_contained_postcode(self):
|
|
"""A single parcel containing UPRNs for [A, A, B] must yield a fragment
|
|
for BOTH A and B. Pre-fix the majority winner (A) claimed the whole
|
|
parcel, excluding it from `remaining`, so B's UPRNs were trapped inside
|
|
claimed land and B vanished entirely (no fragment)."""
|
|
oa_geom = box(0, 0, 100, 100)
|
|
parcel = box(0, 0, 100, 100) # one parcel covering the whole OA
|
|
points = np.array(
|
|
[
|
|
[20, 50], # postcode A
|
|
[30, 50], # postcode A (majority)
|
|
[80, 50], # postcode B (minority — would be dropped pre-fix)
|
|
]
|
|
)
|
|
postcodes = ["A", "A", "B"]
|
|
|
|
fragments = process_oa(oa_geom, points, postcodes, inspire_candidates=[parcel])
|
|
frag_dict = dict(fragments)
|
|
|
|
assert "A" in frag_dict, "Majority postcode A must keep a fragment"
|
|
assert "B" in frag_dict, "Minority postcode B must not be dropped"
|
|
assert frag_dict["A"].area > 0
|
|
assert frag_dict["B"].area > 0
|
|
# The split must partition the parcel without overlap.
|
|
assert frag_dict["A"].intersection(frag_dict["B"]).area < 0.01
|
|
|
|
|
|
class TestProcessOASeedFootprintGuarantee:
|
|
"""Every postcode with a UPRN seed in a multi-postcode OA must produce a
|
|
fragment, even when its partition cell collapses below MIN_GEOM_AREA — an
|
|
active postcode must never be dropped (validate_outputs is zero-tolerance)."""
|
|
|
|
def test_collapsed_voronoi_cells_rescued_as_footprints(self):
|
|
# OA just above MIN_GEOM_AREA; a 2-way Voronoi split leaves each cell
|
|
# (~0.0098 m²) below MIN_GEOM_AREA, so both would be dropped without rescue.
|
|
oa_geom = box(0, 0, 0.14, 0.14) # 0.0196 m²
|
|
points = np.array([[0.035, 0.07], [0.105, 0.07]])
|
|
postcodes = ["AA1 1AA", "AA1 1AB"]
|
|
|
|
fragments = process_oa(oa_geom, points, postcodes, inspire_candidates=[])
|
|
got = {pc for pc, _ in fragments}
|
|
assert got == {"AA1 1AA", "AA1 1AB"}, f"dropped: {set(postcodes) - got}"
|
|
for _, geom in fragments:
|
|
assert geom.is_valid
|
|
assert geom.area > MIN_GEOM_AREA
|
|
|
|
def test_seed_footprint_sits_on_the_uprn(self):
|
|
from shapely.geometry import Point
|
|
|
|
from .process_oa import _seed_footprints
|
|
|
|
oa_geom = box(0, 0, 1000, 1000)
|
|
points = np.array([[500.0, 500.0]])
|
|
out = _seed_footprints({"AA1 1AA"}, points, ["AA1 1AA"], oa_geom)
|
|
|
|
assert len(out) == 1
|
|
pc, geom = out[0]
|
|
assert pc == "AA1 1AA"
|
|
assert geom.is_valid and geom.area > MIN_GEOM_AREA
|
|
assert geom.contains(Point(500, 500))
|
|
|
|
def test_only_orphans_get_footprints(self):
|
|
# A wins real area; B's lone seed collapses. Only B should be rescued, and
|
|
# A's genuine (large) fragment must be untouched by the rescue.
|
|
oa_geom = box(0, 0, 0.14, 0.14)
|
|
points = np.array([[0.07, 0.07], [0.07, 0.07], [0.105, 0.07]])
|
|
postcodes = ["AA1 1AA", "AA1 1AA", "AA1 1AB"]
|
|
fragments = process_oa(oa_geom, points, postcodes, inspire_candidates=[])
|
|
assert {pc for pc, _ in fragments} == {"AA1 1AA", "AA1 1AB"}
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# _extract_polygonal helper
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
class TestExtractPolygonal:
|
|
"""_extract_polygonal must strip non-polygon parts from geometry collections."""
|
|
|
|
def test_polygon_passthrough(self):
|
|
poly = box(0, 0, 10, 10)
|
|
assert _extract_polygonal(poly) is poly
|
|
|
|
def test_multipolygon_passthrough(self):
|
|
mp = MultiPolygon([box(0, 0, 10, 10), box(20, 20, 30, 30)])
|
|
assert _extract_polygonal(mp) is mp
|
|
|
|
def test_geometry_collection_extracts_polygon(self):
|
|
from shapely.geometry import GeometryCollection, LineString
|
|
|
|
poly = box(0, 0, 10, 10)
|
|
line = LineString([(0, 0), (10, 10)])
|
|
gc = GeometryCollection([poly, line])
|
|
result = _extract_polygonal(gc)
|
|
assert result is not None
|
|
assert result.geom_type == "Polygon"
|
|
assert result.area == pytest.approx(poly.area)
|
|
|
|
def test_geometry_collection_no_polygons_returns_none(self):
|
|
from shapely.geometry import GeometryCollection, LineString, Point
|
|
|
|
gc = GeometryCollection([LineString([(0, 0), (1, 1)]), Point(5, 5)])
|
|
assert _extract_polygonal(gc) is None
|
|
|
|
def test_line_returns_none(self):
|
|
from shapely.geometry import LineString
|
|
|
|
assert _extract_polygonal(LineString([(0, 0), (1, 1)])) is None
|
|
|
|
def test_overlapping_collection_unioned_to_valid(self):
|
|
"""A GeometryCollection with OVERLAPPING polygons must be unioned into a
|
|
VALID geometry (not a raw MultiPolygon, which would be invalid and crash
|
|
the next .difference()), and must not double-count the overlap area."""
|
|
from shapely.geometry import GeometryCollection
|
|
|
|
a = box(0, 0, 100, 100)
|
|
b = box(50, 50, 150, 150) # overlaps a by 50x50
|
|
result = _extract_polygonal(GeometryCollection([a, b]))
|
|
assert result is not None
|
|
assert result.is_valid
|
|
assert result.area == pytest.approx(unary_union([a, b]).area)
|
|
# And the formerly-crashing op now works:
|
|
assert result.difference(box(0, 0, 10, 10)).is_valid
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Edge case: merge_fragments handles single-OA postcodes
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
class TestMergeFragments:
|
|
"""merge_fragments must handle edge cases cleanly."""
|
|
|
|
def test_single_fragment_passthrough(self):
|
|
"""A postcode with one fragment should pass through unchanged."""
|
|
poly = box(0, 0, 100, 100)
|
|
result = merge_fragments([("AA1 1AA", poly)])
|
|
assert "AA1 1AA" in result
|
|
assert result["AA1 1AA"].equals(poly)
|
|
|
|
def test_empty_fragments_excluded(self):
|
|
"""Empty geometries should not appear in output."""
|
|
empty = Polygon()
|
|
result = merge_fragments([("AA1 1AA", empty)])
|
|
assert "AA1 1AA" not in result
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Edge case: to_wgs84_geojson handles degenerate geometries
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
class TestToWgs84Geojson:
|
|
"""to_wgs84_geojson must handle edge cases."""
|
|
|
|
def test_empty_geometry_returns_none(self):
|
|
assert to_wgs84_geojson(Polygon()) is None
|
|
|
|
def test_valid_polygon_returns_geojson(self):
|
|
# Small square in BNG
|
|
poly = box(530000, 180000, 530100, 180100)
|
|
result = to_wgs84_geojson(poly)
|
|
assert result is not None
|
|
assert result["type"] == "Polygon"
|
|
assert len(result["coordinates"]) >= 1
|
|
assert len(result["coordinates"][0]) >= 4 # closed ring
|
|
|
|
def test_multipolygon_returns_largest(self):
|
|
"""MultiPolygon input should return only the largest polygon."""
|
|
big = box(530000, 180000, 530100, 180100)
|
|
small = box(530200, 180200, 530210, 180210)
|
|
mp = MultiPolygon([big, small])
|
|
result = to_wgs84_geojson(mp)
|
|
assert result is not None
|
|
assert result["type"] == "Polygon"
|
|
|
|
def test_coordinates_have_limited_precision(self):
|
|
"""GeoJSON coordinates should be rounded to 6 decimal places."""
|
|
import json
|
|
|
|
poly = box(530000, 180000, 530100, 180100)
|
|
result = to_wgs84_geojson(poly)
|
|
assert result is not None
|
|
# Check precision via JSON serialization (what actually hits disk)
|
|
for lon, lat in result["coordinates"][0]:
|
|
lon_s = json.dumps(lon)
|
|
lat_s = json.dumps(lat)
|
|
lon_dp = len(lon_s.split(".")[1]) if "." in lon_s else 0
|
|
lat_dp = len(lat_s.split(".")[1]) if "." in lat_s else 0
|
|
assert lon_dp <= 6, f"Longitude {lon_s} has {lon_dp} decimal places"
|
|
assert lat_dp <= 6, f"Latitude {lat_s} has {lat_dp} decimal places"
|
|
|
|
def test_write_district_geojson_replaces_stale_units(self, tmp_path):
|
|
stale_units = tmp_path / "units"
|
|
stale_units.mkdir()
|
|
(stale_units / "ZZ1.geojson").write_text(
|
|
json.dumps({"type": "FeatureCollection", "features": []})
|
|
)
|
|
|
|
file_count = write_district_geojson(
|
|
{"AA1 1AA": box(530000, 180000, 530100, 180100)}, tmp_path
|
|
)
|
|
|
|
assert file_count == 1
|
|
assert not (stale_units / "ZZ1.geojson").exists()
|
|
written = json.loads((stale_units / "AA1.geojson").read_text())
|
|
assert written["features"][0]["properties"]["postcodes"] == "AA1 1AA"
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Edge case: parse_gpkg_geometry rejects unknown envelope types
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
class TestParseGpkgGeometry:
|
|
"""parse_gpkg_geometry must raise on unknown envelope types."""
|
|
|
|
def test_unknown_envelope_type_raises(self):
|
|
# Build a minimal GeoPackage blob with envelope_type=5
|
|
# Byte 3 = flags: envelope_type in bits 3-1, so type 5 = 0b00001010
|
|
blob = bytes([0x47, 0x50, 0x00, 0b00001010]) + b"\x00" * 100
|
|
with pytest.raises(ValueError, match="Unknown GeoPackage envelope type 5"):
|
|
parse_gpkg_geometry(blob)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# _fill_holes removes interior rings
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
class TestFillHoles:
|
|
"""_fill_holes fills small artifact holes but keeps large (real-enclosed) ones."""
|
|
|
|
def test_small_artifact_hole_filled(self):
|
|
"""A small (<1000 m²) interior ring is an artifact and gets filled."""
|
|
outer = [(0, 0), (100, 0), (100, 100), (0, 100), (0, 0)]
|
|
hole = [(40, 40), (60, 40), (60, 60), (40, 60), (40, 40)] # 20x20 = 400 m²
|
|
poly_with_hole = Polygon(outer, [hole])
|
|
assert len(list(poly_with_hole.interiors)) == 1
|
|
result = _fill_holes(poly_with_hole)
|
|
assert result.geom_type == "Polygon"
|
|
assert len(list(result.interiors)) == 0
|
|
assert result.area == pytest.approx(Polygon(outer).area)
|
|
|
|
def test_large_hole_kept(self):
|
|
"""A large (>=1000 m²) hole is likely a real enclosed postcode — keep it."""
|
|
outer = [(0, 0), (100, 0), (100, 100), (0, 100), (0, 0)]
|
|
hole = [(20, 20), (80, 20), (80, 80), (20, 80), (20, 20)] # 60x60 = 3600 m²
|
|
poly_with_hole = Polygon(outer, [hole])
|
|
result = _fill_holes(poly_with_hole)
|
|
assert len(list(result.interiors)) == 1
|
|
assert result.area == pytest.approx(10000 - 3600)
|
|
|
|
def test_multipolygon_with_holes(self):
|
|
"""A MultiPolygon where each part has holes should have all holes removed."""
|
|
outer1 = [(0, 0), (50, 0), (50, 50), (0, 50), (0, 0)]
|
|
hole1 = [(10, 10), (20, 10), (20, 20), (10, 20), (10, 10)]
|
|
outer2 = [(60, 60), (110, 60), (110, 110), (60, 110), (60, 60)]
|
|
hole2 = [(70, 70), (80, 70), (80, 80), (70, 80), (70, 70)]
|
|
mp = MultiPolygon([Polygon(outer1, [hole1]), Polygon(outer2, [hole2])])
|
|
result = _fill_holes(mp)
|
|
assert result.geom_type == "MultiPolygon"
|
|
for p in result.geoms:
|
|
assert len(list(p.interiors)) == 0
|
|
|
|
def test_polygon_without_hole_unchanged(self):
|
|
"""A polygon with no holes should pass through unchanged."""
|
|
poly = box(0, 0, 100, 100)
|
|
result = _fill_holes(poly)
|
|
assert result.area == pytest.approx(poly.area)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Improved merge with 5m buffer closes 3m gaps
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
class TestMergeImprovedBuffer:
|
|
"""The 5m buffer should close gaps that the old 1m buffer could not."""
|
|
|
|
def test_3m_gap_merged(self):
|
|
"""Two fragments with a 3m gap should merge into a single polygon."""
|
|
left = box(0, 0, 50, 100)
|
|
right = box(53, 0, 100, 100) # 3m gap at x=50..53
|
|
result = merge_fragments([("AA1 1AA", left), ("AA1 1AA", right)])
|
|
assert "AA1 1AA" in result
|
|
geom = result["AA1 1AA"]
|
|
assert geom.geom_type == "Polygon", (
|
|
f"Expected single Polygon after merging 3m gap, got {geom.geom_type}"
|
|
)
|
|
|
|
def test_holes_removed_after_merge(self):
|
|
"""Interior holes created by merging should be filled."""
|
|
# Create a donut-like shape from fragments
|
|
outer = box(0, 0, 100, 100)
|
|
inner = box(30, 30, 70, 70)
|
|
ring = outer.difference(inner)
|
|
# Add the inner piece as a separate fragment
|
|
result = merge_fragments([("AA1 1AA", ring), ("AA1 1AA", inner)])
|
|
assert "AA1 1AA" in result
|
|
geom = result["AA1 1AA"]
|
|
assert len(list(geom.interiors)) == 0, "Merged polygon should have no holes"
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# subtract_greenspace
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
class TestSubtractGreenspace:
|
|
"""subtract_greenspace must remove park/water area from postcode polygons."""
|
|
|
|
def test_park_subtracted(self):
|
|
"""A park overlapping a postcode should reduce its area."""
|
|
from shapely.strtree import STRtree
|
|
|
|
postcode = box(0, 0, 100, 100) # 10000 sqm
|
|
park = box(60, 0, 100, 100) # 4000 sqm overlap on the right
|
|
tree = STRtree([park])
|
|
geoms = [park]
|
|
result = subtract_greenspace(postcode, tree, geoms)
|
|
# Should have lost ~4000 sqm
|
|
assert result.area == pytest.approx(6000, rel=0.01)
|
|
|
|
def test_no_greenspace_unchanged(self):
|
|
"""With no overlapping greenspace, the geometry should be unchanged."""
|
|
from shapely.strtree import STRtree
|
|
|
|
postcode = box(0, 0, 100, 100)
|
|
park = box(200, 200, 300, 300) # far away
|
|
tree = STRtree([park])
|
|
geoms = [park]
|
|
result = subtract_greenspace(postcode, tree, geoms)
|
|
assert result.area == pytest.approx(postcode.area)
|
|
|
|
def test_full_overlap_preserves_postcode(self):
|
|
"""If greenspace covers the entire postcode, keep the original."""
|
|
from shapely.strtree import STRtree
|
|
|
|
postcode = box(0, 0, 100, 100)
|
|
park = box(-10, -10, 110, 110) # completely covers postcode
|
|
tree = STRtree([park])
|
|
geoms = [park]
|
|
result = subtract_greenspace(postcode, tree, geoms)
|
|
# Should keep original since subtraction would erase entirely
|
|
assert result.area == pytest.approx(postcode.area)
|
|
|
|
def test_over_90pct_removal_preserves_postcode(self):
|
|
"""If greenspace would remove >90% of area, keep the original."""
|
|
from shapely.strtree import STRtree
|
|
|
|
postcode = box(0, 0, 100, 100) # 10000 sqm
|
|
park = box(5, 0, 100, 100) # 9500 sqm overlap = 95% removal
|
|
tree = STRtree([park])
|
|
geoms = [park]
|
|
result = subtract_greenspace(postcode, tree, geoms)
|
|
# Should keep original since >90% would be removed
|
|
assert result.area == pytest.approx(postcode.area)
|
|
|
|
def test_under_90pct_removal_subtracts(self):
|
|
"""If greenspace removes <90%, subtraction should proceed."""
|
|
from shapely.strtree import STRtree
|
|
|
|
postcode = box(0, 0, 100, 100) # 10000 sqm
|
|
park = box(20, 0, 100, 100) # 8000 sqm overlap = 80% removal
|
|
tree = STRtree([park])
|
|
geoms = [park]
|
|
result = subtract_greenspace(postcode, tree, geoms)
|
|
# 80% < 90% cap, so subtraction should happen
|
|
assert result.area == pytest.approx(2000, rel=0.01)
|
|
|
|
|
|
class TestToWgs84GeojsonValidity:
|
|
"""to_wgs84_geojson must emit GeoJSON that round-trips to a valid geometry."""
|
|
|
|
def test_geojson_round_trips_to_valid_geometry(self):
|
|
from shapely.geometry import shape
|
|
|
|
geojson = to_wgs84_geojson(box(530000, 180000, 530100, 180100))
|
|
assert geojson is not None
|
|
rt = shape(geojson)
|
|
assert not rt.is_empty
|
|
assert rt.is_valid
|
|
|
|
def test_written_district_features_are_all_valid(self, tmp_path):
|
|
from shapely.geometry import shape
|
|
|
|
postcodes = {
|
|
"AA1 1AA": box(530000, 180000, 530100, 180100),
|
|
"AA1 1AB": MultiPolygon(
|
|
[
|
|
box(530200, 180000, 530250, 180050),
|
|
box(530200, 180060, 530250, 180110),
|
|
]
|
|
),
|
|
}
|
|
assert write_district_geojson(postcodes, tmp_path) == 1
|
|
collection = json.loads((tmp_path / "units" / "AA1.geojson").read_text())
|
|
for feature in collection["features"]:
|
|
geom = shape(feature["geometry"])
|
|
assert geom.is_valid
|
|
assert not geom.is_empty
|
|
|
|
|
|
class TestGreenspaceHolePreserved:
|
|
"""Interior holes carved by greenspace subtraction must survive merge_fragments
|
|
(the post-subtraction _fill_holes that previously negated them was removed)."""
|
|
|
|
def test_interior_lake_hole_survives_merge_fragments(self):
|
|
from shapely.strtree import STRtree
|
|
|
|
postcode = box(0, 0, 100, 100) # 10000 sqm
|
|
lake = box(30, 30, 70, 70) # 1600 sqm fully-interior hole (16% removal)
|
|
result = merge_fragments(
|
|
[("TEST1", postcode)],
|
|
greenspace_tree=STRtree([lake]),
|
|
greenspace_geoms=[lake],
|
|
)
|
|
merged = result["TEST1"]
|
|
assert len(list(merged.interiors)) == 1
|
|
assert merged.area == pytest.approx(10000 - 1600, rel=0.05)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# merge_fragments keeps substantial detached parts (no OA-seam coverage gaps)
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
class TestKeepDetachedParts:
|
|
"""A postcode split across an OA seam (railway/river) must keep both parts
|
|
instead of dropping all but the largest, which left ~1.8% uncovered gaps."""
|
|
|
|
def test_far_apart_parts_both_kept(self):
|
|
# Two 50x50m blocks 30m apart — wider than the 10m merge buffer.
|
|
a = box(0, 0, 50, 50) # 2500 m²
|
|
b = box(80, 0, 130, 50) # 2500 m², 30m gap
|
|
geom = merge_fragments([("AA1 1AA", a), ("AA1 1AA", b)])["AA1 1AA"]
|
|
assert geom.geom_type == "MultiPolygon"
|
|
assert len(geom.geoms) == 2
|
|
assert geom.area == pytest.approx(5000, rel=0.01)
|
|
|
|
def test_tiny_noise_part_dropped(self):
|
|
main = box(0, 0, 100, 100) # 10000 m²
|
|
noise = box(200, 200, 205, 205) # 25 m² < 100 m² threshold
|
|
geom = merge_fragments([("AA1 1AA", main), ("AA1 1AA", noise)])["AA1 1AA"]
|
|
assert geom.geom_type == "Polygon"
|
|
assert geom.area == pytest.approx(10000, rel=0.01)
|
|
|
|
|
|
class TestMultiPolygonOutput:
|
|
"""to_wgs84_geojson_multi / the writer must emit MultiPolygon for split
|
|
postcodes (the Rust server + loader already parse MultiPolygon)."""
|
|
|
|
def test_multipolygon_preserves_all_parts(self):
|
|
from shapely.geometry import shape
|
|
|
|
mp = MultiPolygon(
|
|
[
|
|
box(530000, 180000, 530100, 180100),
|
|
box(531000, 180000, 531100, 180100),
|
|
]
|
|
)
|
|
gj = to_wgs84_geojson_multi(mp)
|
|
assert gj["type"] == "MultiPolygon"
|
|
assert len(gj["coordinates"]) == 2
|
|
rt = shape(gj)
|
|
assert rt.is_valid and not rt.is_empty
|
|
assert len(rt.geoms) == 2
|
|
|
|
def test_single_part_stays_polygon(self):
|
|
gj = to_wgs84_geojson_multi(box(530000, 180000, 530100, 180100))
|
|
assert gj["type"] == "Polygon"
|
|
|
|
def test_writer_emits_multipolygon_feature(self, tmp_path):
|
|
mp = MultiPolygon(
|
|
[
|
|
box(530000, 180000, 530100, 180100),
|
|
box(531000, 180000, 531100, 180100),
|
|
]
|
|
)
|
|
assert write_district_geojson({"AA1 1AA": mp}, tmp_path) == 1
|
|
coll = json.loads((tmp_path / "units" / "AA1.geojson").read_text())
|
|
assert coll["features"][0]["geometry"]["type"] == "MultiPolygon"
|
|
|
|
|
|
class TestOutputPartition:
|
|
"""The writer must emit a partition: overlapping postcodes are made disjoint
|
|
(no two cover the same ground) without dropping an active postcode."""
|
|
|
|
def test_overlapping_postcodes_made_disjoint(self, tmp_path):
|
|
from shapely.geometry import shape
|
|
|
|
a = box(530000, 180000, 530100, 180100)
|
|
b = box(530090, 180000, 530200, 180100) # overlaps `a` in a 10m strip
|
|
assert a.intersection(b).area > 0 # precondition: they overlap
|
|
|
|
write_district_geojson({"AA1 1AA": a, "AA1 1AB": b}, tmp_path)
|
|
coll = json.loads((tmp_path / "units" / "AA1.geojson").read_text())
|
|
geoms = {
|
|
f["properties"]["postcodes"]: shape(f["geometry"])
|
|
for f in coll["features"]
|
|
}
|
|
assert set(geoms) == {"AA1 1AA", "AA1 1AB"} # neither dropped
|
|
# Disjoint interiors (share at most an edge).
|
|
assert geoms["AA1 1AA"].intersection(geoms["AA1 1AB"]).area == pytest.approx(
|
|
0.0, abs=1e-12
|
|
)
|
|
assert all(g.area > 0 for g in geoms.values())
|
|
|
|
def test_enclosed_postcode_makes_container_a_donut(self, tmp_path):
|
|
"""A postcode fully INSIDE another must stay disjoint: the smaller (inner)
|
|
keeps its area, the container gets a hole. A plain `overlaps` query misses
|
|
containment, so this is the regression guard for that fix."""
|
|
from shapely.geometry import shape
|
|
|
|
outer = box(530000, 180000, 530300, 180300) # 90,000 m²
|
|
inner = box(530100, 180100, 530200, 180200) # 10,000 m², fully inside outer
|
|
assert outer.contains(inner) # precondition
|
|
|
|
write_district_geojson({"AA1 1AA": outer, "AA1 1AB": inner}, tmp_path)
|
|
coll = json.loads((tmp_path / "units" / "AA1.geojson").read_text())
|
|
geoms = {
|
|
f["properties"]["postcodes"]: shape(f["geometry"])
|
|
for f in coll["features"]
|
|
}
|
|
assert set(geoms) == {"AA1 1AA", "AA1 1AB"} # neither dropped
|
|
assert geoms["AA1 1AA"].intersection(geoms["AA1 1AB"]).area == pytest.approx(
|
|
0.0, abs=1e-12
|
|
)
|
|
# Container is now a donut around the enclosed postcode.
|
|
assert geoms["AA1 1AA"].geom_type == "Polygon"
|
|
assert len(list(geoms["AA1 1AA"].interiors)) == 1
|
|
assert geoms["AA1 1AB"].area > 0
|
|
|
|
|
|
class TestGeojsonGeometrySliverValidity:
|
|
"""_geojson_geometry must emit geometry that is still valid after the final 6dp
|
|
rounding. A sub-grid overlap sliver (left by _resolve_overlaps' full-precision
|
|
difference) must be snapped away on the output grid rather than collapsed by a
|
|
naive round into a degenerate ('Too few points') / self-intersecting ring that
|
|
only fails once read back from disk."""
|
|
|
|
def test_subgrid_sliver_does_not_produce_invalid_ring(self):
|
|
from shapely.geometry import shape
|
|
from shapely.validation import explain_validity
|
|
|
|
from .output import _geojson_geometry
|
|
|
|
main = box(-0.34, 51.74, -0.33, 51.75) # ~degree-scale, valid
|
|
# A thin triangle whose three vertices all round to the same 1e-6 cell.
|
|
sliver = Polygon(
|
|
[
|
|
(-0.3400284, 51.7505061),
|
|
(-0.3400280, 51.7505060),
|
|
(-0.3400281, 51.7505063),
|
|
]
|
|
)
|
|
assert sliver.is_valid
|
|
|
|
gj = _geojson_geometry(MultiPolygon([main, sliver]))
|
|
assert gj is not None
|
|
rt = shape(gj)
|
|
assert rt.is_valid, explain_validity(rt)
|
|
assert not rt.is_empty
|
|
# The real (main) area is preserved; only the degenerate sliver is dropped.
|
|
assert rt.area == pytest.approx(main.area, rel=1e-3)
|
|
|
|
def test_pure_sliver_is_rescued_not_written_invalid(self):
|
|
from shapely.geometry import shape
|
|
|
|
from .output import _OUTPUT_PRECISION_DEG, _geojson_geometry
|
|
|
|
# A sub-grid sliver (extent < 1e-6° in one dimension) that snaps to empty:
|
|
# it must be rescued into a minimal valid footprint at its location, never
|
|
# written invalid and never dropped (an active postcode keeps a boundary).
|
|
g = _OUTPUT_PRECISION_DEG
|
|
sliver = Polygon(
|
|
[
|
|
(-0.30, 51.75),
|
|
(-0.30 + 5 * g, 51.75),
|
|
(-0.30 + 5 * g, 51.75 + 0.2 * g),
|
|
(-0.30, 51.75 + 0.1 * g),
|
|
]
|
|
)
|
|
gj = _geojson_geometry(sliver)
|
|
assert gj is not None # rescued, not dropped
|
|
rt = shape(gj)
|
|
assert rt.is_valid and not rt.is_empty
|
|
assert rt.distance(sliver) == pytest.approx(0.0, abs=1e-4)
|
|
|
|
|
|
class TestColocatedPostcodesAllRetained:
|
|
"""Co-located non-geographic postcodes (e.g. AL1 9xx) have heavily-overlapping
|
|
tiny footprints; the de-overlap pass trims most to sub-grid slivers. None may
|
|
be dropped — every active postcode must keep a (valid, non-empty) boundary."""
|
|
|
|
def test_overlapping_tiny_footprints_none_dropped(self, tmp_path):
|
|
from shapely.geometry import Point, shape
|
|
|
|
# 12 ~3 m discs within ~1 m of each other → after de-overlap most collapse
|
|
# below output precision.
|
|
postcodes = {}
|
|
for i in range(12):
|
|
e = 514000.0 + (i % 4) * 0.3
|
|
n = 206000.0 + (i // 4) * 0.3
|
|
postcodes[f"AL1 9{chr(65 + i)}{chr(65 + i)}"] = Point(e, n).buffer(3.0)
|
|
|
|
write_district_geojson(postcodes, tmp_path)
|
|
coll = json.loads((tmp_path / "units" / "AL1.geojson").read_text())
|
|
written = {f["properties"]["postcodes"] for f in coll["features"]}
|
|
assert written == set(postcodes), f"dropped: {set(postcodes) - written}"
|
|
for feature in coll["features"]:
|
|
geom = shape(feature["geometry"])
|
|
assert geom.is_valid and not geom.is_empty
|
|
|
|
|
|
class TestSafeOverlayHelpers:
|
|
"""The robust overlay helpers retry on a fixed-precision grid after a
|
|
GEOSException (e.g. ``side location conflict`` from near-coincident OA-seam
|
|
edges). The grid is a caller-supplied parameter: metres for the BNG stages,
|
|
1e-6 degrees for the WGS84 output stage — so the same helper serves both
|
|
without crushing degree-scale shapes on the metre default."""
|
|
|
|
def test_grid_param_honored_on_clean_inputs(self):
|
|
from . import geometry
|
|
|
|
a = box(0, 0, 10, 10)
|
|
b = box(5, 0, 15, 10)
|
|
# No exception → exact result, identical regardless of the fallback grid.
|
|
assert geometry.safe_difference(a, b, grid=1e-6).equals(a.difference(b))
|
|
assert geometry.safe_union([a, b], grid=1e-6).equals(unary_union([a, b]))
|
|
assert geometry.safe_intersection(a, b, grid=1e-6).equals(a.intersection(b))
|
|
|
|
def test_difference_falls_back_to_fixed_precision_overlay(self, monkeypatch):
|
|
from shapely import GEOSException
|
|
from shapely.geometry.base import BaseGeometry
|
|
|
|
from . import geometry
|
|
|
|
a = box(0, 0, 10, 10)
|
|
b = box(5, 0, 15, 10)
|
|
|
|
def _boom(self, other, *args, **kwargs):
|
|
raise GEOSException("forced side location conflict")
|
|
|
|
# Patch the .difference() *method*; the fallback uses the top-level
|
|
# shapely.difference() function, so it still completes.
|
|
monkeypatch.setattr(BaseGeometry, "difference", _boom)
|
|
result = geometry.safe_difference(a, b, grid=1e-6)
|
|
assert result.is_valid
|
|
assert result.area == pytest.approx(50.0) # left half of `a`
|
|
|
|
def test_union_falls_back_to_fixed_precision_overlay(self, monkeypatch):
|
|
from shapely import GEOSException
|
|
|
|
from . import geometry
|
|
|
|
a = box(0, 0, 10, 10)
|
|
b = box(5, 0, 15, 10)
|
|
|
|
def _boom(_geoms):
|
|
raise GEOSException("forced robustness failure")
|
|
|
|
monkeypatch.setattr(geometry, "unary_union", _boom)
|
|
result = geometry.safe_union([a, b], grid=1e-6)
|
|
assert result.is_valid
|
|
assert result.area == pytest.approx(150.0) # 100 + 100 - 50 overlap
|
|
|
|
# A self-intersecting bow-tie: invalid. set_precision()'s DEFAULT
|
|
# ('valid_output') mode runs its own noding pass that re-raises
|
|
# 'side location conflict' on this — which is exactly how the production
|
|
# crash happened (the fallback re-raised the error it was meant to absorb).
|
|
_BOWTIE = Polygon([(0, 0), (1e-5, 1e-5), (1e-5, 0), (0, 1e-5), (0, 0)])
|
|
# make_valid() of this spiky ring returns a mixed-dimension
|
|
# GeometryCollection (polygon + dangling line); OverlayNG rejects that with
|
|
# 'Overlay input is mixed-dimension', so the fallback must strip the debris.
|
|
_SPIKY = Polygon(
|
|
[(0, 0), (10, 0), (10, 10), (5, 5), (5.0000001, 5), (0, 10), (0, 0), (15, -5), (0, 0)]
|
|
)
|
|
|
|
def test_difference_fallback_survives_invalid_input(self, monkeypatch):
|
|
"""Regression for the production crash: the fallback must not reduce
|
|
precision with set_precision's default valid_output mode, which re-raises
|
|
'side location conflict' on an invalid geometry."""
|
|
from shapely import GEOSException
|
|
from shapely.geometry.base import BaseGeometry
|
|
|
|
from . import geometry
|
|
|
|
assert not self._BOWTIE.is_valid # precondition
|
|
|
|
def _boom(self, other, *a, **k):
|
|
raise GEOSException("forced side location conflict")
|
|
|
|
monkeypatch.setattr(BaseGeometry, "difference", _boom)
|
|
result = geometry.safe_difference(self._BOWTIE, box(0, 0, 5e-6, 5e-6), grid=1e-6)
|
|
assert result.is_valid
|
|
assert result.geom_type in ("Polygon", "MultiPolygon")
|
|
|
|
def test_difference_fallback_survives_make_valid_geometrycollection(
|
|
self, monkeypatch
|
|
):
|
|
"""Regression: make_valid can yield a mixed-dimension GeometryCollection
|
|
that OverlayNG rejects; the fallback must keep only polygonal parts."""
|
|
from shapely import GEOSException, make_valid
|
|
from shapely.geometry.base import BaseGeometry
|
|
|
|
from . import geometry
|
|
|
|
assert make_valid(self._SPIKY).geom_type == "GeometryCollection" # precondition
|
|
|
|
def _boom(self, other, *a, **k):
|
|
raise GEOSException("forced side location conflict")
|
|
|
|
monkeypatch.setattr(BaseGeometry, "difference", _boom)
|
|
result = geometry.safe_difference(self._SPIKY, box(2, 2, 8, 8), grid=1e-4)
|
|
assert result.is_valid
|
|
assert result.geom_type in ("Polygon", "MultiPolygon")
|
|
assert result.area > 0
|
|
|
|
def test_intersection_fallback_survives_make_valid_geometrycollection(
|
|
self, monkeypatch
|
|
):
|
|
from shapely import GEOSException
|
|
from shapely.geometry.base import BaseGeometry
|
|
|
|
from . import geometry
|
|
|
|
def _boom(self, other, *a, **k):
|
|
raise GEOSException("forced side location conflict")
|
|
|
|
monkeypatch.setattr(BaseGeometry, "intersection", _boom)
|
|
result = geometry.safe_intersection(self._SPIKY, box(2, 2, 8, 8), grid=1e-4)
|
|
assert result.is_valid
|
|
assert result.geom_type in ("Polygon", "MultiPolygon")
|
|
assert result.area > 0
|
|
|
|
def test_union_fallback_survives_invalid_and_collection_inputs(self, monkeypatch):
|
|
"""The union fallback must absorb both an invalid bow-tie and a
|
|
make_valid-becomes-GeometryCollection input without raising."""
|
|
from shapely import GEOSException
|
|
|
|
from . import geometry
|
|
|
|
def _boom(_geoms):
|
|
raise GEOSException("forced robustness failure")
|
|
|
|
monkeypatch.setattr(geometry, "unary_union", _boom)
|
|
result = geometry.safe_union(
|
|
[self._BOWTIE, self._SPIKY, box(20, 20, 30, 30)], grid=1e-4
|
|
)
|
|
assert result.is_valid
|
|
assert result.geom_type in ("Polygon", "MultiPolygon")
|
|
|
|
def test_helpers_never_raise_on_empty_inputs(self):
|
|
"""Degenerate/empty inputs must not abort a multi-hour run."""
|
|
from . import geometry
|
|
|
|
empty = Polygon()
|
|
a = box(0, 0, 10, 10)
|
|
assert geometry.safe_difference(empty, a).is_empty
|
|
assert geometry.safe_difference(a, empty).equals(a)
|
|
assert geometry.safe_intersection(empty, a).is_empty
|
|
assert geometry.safe_union([]).is_empty
|
|
assert geometry.safe_union([empty]).is_empty
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# InspireIndex must return the same candidates as a brute-force bbox scan
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
class TestInspireIndex:
|
|
"""The grid index replaces a per-OA linear scan of all parcel bboxes; it must
|
|
return an identical candidate set (and order) so Phase 3 output is unchanged."""
|
|
|
|
@staticmethod
|
|
def _brute(bboxes, box):
|
|
e0, n0, e1, n1 = box
|
|
mask = (
|
|
(bboxes[:, 2] >= e0)
|
|
& (bboxes[:, 0] <= e1)
|
|
& (bboxes[:, 3] >= n0)
|
|
& (bboxes[:, 1] <= n1)
|
|
)
|
|
return np.where(mask)[0]
|
|
|
|
def test_matches_brute_force_over_random_queries(self):
|
|
rng = np.random.default_rng(0)
|
|
x = rng.uniform(0, 10000, 5000)
|
|
y = rng.uniform(0, 10000, 5000)
|
|
w = rng.uniform(1, 60, 5000) # all <= 500m cell → CSR path
|
|
h = rng.uniform(1, 60, 5000)
|
|
bboxes = np.column_stack([x, y, x + w, y + h]).astype(np.float64)
|
|
idx = build_inspire_index(bboxes, None, None, cell_size=500.0)
|
|
|
|
for _ in range(400):
|
|
cx, cy = rng.uniform(0, 10000), rng.uniform(0, 10000)
|
|
sz = float(rng.choice([30.0, 200.0, 1000.0, 3000.0]))
|
|
box = (cx, cy, cx + sz, cy + sz)
|
|
got = idx.candidate_indices(box)
|
|
expected = np.sort(self._brute(bboxes, box))
|
|
assert np.array_equal(got, expected)
|
|
|
|
def test_oversized_parcel_is_found(self):
|
|
# A parcel larger than a cell goes to the overflow list, not the grid;
|
|
# a query deep inside it (away from the small parcels) must still find it.
|
|
bboxes = np.array(
|
|
[
|
|
[0.0, 0.0, 5000.0, 5000.0], # 5km parcel >> 500m cell
|
|
[100.0, 100.0, 120.0, 120.0],
|
|
[4000.0, 4000.0, 4020.0, 4020.0],
|
|
]
|
|
)
|
|
idx = build_inspire_index(bboxes, None, None, cell_size=500.0)
|
|
box = (2000.0, 2000.0, 2050.0, 2050.0)
|
|
got = idx.candidate_indices(box)
|
|
assert 0 in got
|
|
assert np.array_equal(got, np.sort(self._brute(bboxes, box)))
|
|
|
|
def test_no_overlap_returns_empty(self):
|
|
bboxes = np.array([[0.0, 0.0, 10.0, 10.0], [20.0, 20.0, 30.0, 30.0]])
|
|
idx = build_inspire_index(bboxes, None, None, cell_size=500.0)
|
|
assert len(idx.candidate_indices((100.0, 100.0, 110.0, 110.0))) == 0
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Parallel OA processing must match the sequential result exactly
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
class TestParallelProcessing:
|
|
"""_process_oas across workers must produce the same fragments as workers=1.
|
|
Uses single-postcode OAs (fast path), so it exercises the chunking + WKB
|
|
round-trip + fork machinery without needing INSPIRE data."""
|
|
|
|
@staticmethod
|
|
def _inputs(n_oas=60):
|
|
import pyarrow as pa
|
|
|
|
oa_geoms = {
|
|
f"E{i:08d}": box(i * 100.0, 0.0, i * 100.0 + 50.0, 50.0)
|
|
for i in range(n_oas)
|
|
}
|
|
codes = sorted(oa_geoms)
|
|
east, north, pcs = [], [], []
|
|
offsets = {}
|
|
pos = 0
|
|
for i, code in enumerate(codes):
|
|
east += [i * 100.0 + 10.0, i * 100.0 + 20.0]
|
|
north += [10.0, 20.0]
|
|
pcs += [f"AA{i % 5} {i % 9}AA"] * 2 # one postcode per OA → fast path
|
|
offsets[code] = (pos, pos + 2)
|
|
pos += 2
|
|
return (
|
|
codes,
|
|
oa_geoms,
|
|
np.array(east),
|
|
np.array(north),
|
|
pa.array(pcs, type=pa.large_string()),
|
|
offsets,
|
|
)
|
|
|
|
@staticmethod
|
|
def _norm(frags):
|
|
return sorted((pc, geom.wkb_hex) for pc, geom in frags)
|
|
|
|
def test_parallel_matches_sequential(self):
|
|
codes, oa, east, north, pcs, offs = self._inputs()
|
|
seq, s1 = _process_oas(codes, oa, east, north, pcs, offs, None, workers=1)
|
|
par, s2 = _process_oas(codes, oa, east, north, pcs, offs, None, workers=3)
|
|
assert len(seq) == len(codes) # one fragment per single-postcode OA
|
|
assert s1 == s2 == len(codes)
|
|
assert self._norm(seq) == self._norm(par)
|
|
|
|
def test_oa_failure_is_tagged_with_oa_code(self):
|
|
"""A failure inside per-OA processing must re-raise with the OA code, so a
|
|
single bad OA is attributable instead of an anonymous worker abort."""
|
|
# Missing OA in the geoms dict → KeyError, wrapped with the OA code.
|
|
with pytest.raises(RuntimeError, match="E00099999"):
|
|
_oa_fragments("E00099999", {}, None, None, None, {}, None)
|
|
|
|
|
|
class TestDegenerateGeometryHandling:
|
|
"""Every active postcode must keep a boundary (validate_outputs is strict),
|
|
so a sub-grid sliver is fattened rather than dropped. A genuinely empty
|
|
geometry is skipped without aborting the whole write (the 10h regression)."""
|
|
|
|
# Three near-collinear vertices in BNG: bbox ~28m x 7m but area ~0.04 m²,
|
|
# i.e. AL10 0TU. Without the rescue it snaps to empty at output precision.
|
|
SLIVER = Polygon(
|
|
[(523045.34, 209625.56), (523040.47, 209624.33), (523017.0, 209618.42)]
|
|
)
|
|
|
|
def test_sliver_is_rescued_to_valid_geometry(self):
|
|
from shapely.geometry import shape
|
|
|
|
result = to_wgs84_geojson(self.SLIVER)
|
|
assert result is not None, "sliver must be rescued, not dropped"
|
|
rt = shape(result)
|
|
assert not rt.is_empty
|
|
assert rt.is_valid
|
|
|
|
def test_collinear_zero_area_input_is_rescued(self):
|
|
"""A zero-area collinear 'polygon' (can't be cleaned to a polygon) must
|
|
still be rescued via the representative-point fallback, not dropped."""
|
|
from shapely.geometry import shape
|
|
|
|
degenerate = Polygon(
|
|
[(523000, 209600), (523010, 209600), (523020, 209600), (523000, 209600)]
|
|
)
|
|
assert degenerate.area == 0.0
|
|
result = to_wgs84_geojson(degenerate)
|
|
assert result is not None, "degenerate input must be rescued, not dropped"
|
|
rt = shape(result)
|
|
assert not rt.is_empty
|
|
assert rt.is_valid
|
|
|
|
def test_sliver_postcode_present_in_output(self, tmp_path):
|
|
postcodes = {
|
|
"AA1 1AA": box(530000, 180000, 530100, 180100),
|
|
"AA1 1AB": self.SLIVER, # must survive
|
|
}
|
|
file_count = write_district_geojson(postcodes, tmp_path)
|
|
assert file_count == 1
|
|
collection = json.loads((tmp_path / "units" / "AA1.geojson").read_text())
|
|
written = {f["properties"]["postcodes"] for f in collection["features"]}
|
|
assert written == {"AA1 1AA", "AA1 1AB"}
|
|
|
|
def test_empty_geometry_skipped_not_raised(self, tmp_path):
|
|
# The last-resort safety net: an unrescuable (empty) geometry is skipped
|
|
# so one bad postcode can never abort a multi-hour run.
|
|
postcodes = {
|
|
"AA1 1AA": box(530000, 180000, 530100, 180100),
|
|
"AA1 1AB": Polygon(), # genuinely empty
|
|
}
|
|
file_count = write_district_geojson(postcodes, tmp_path)
|
|
assert file_count == 1
|
|
collection = json.loads((tmp_path / "units" / "AA1.geojson").read_text())
|
|
written = {f["properties"]["postcodes"] for f in collection["features"]}
|
|
assert written == {"AA1 1AA"}
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# fragments_cache round-trips Phase 3 output and validates freshness
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
class TestFragmentsCache:
|
|
"""Persisting Phase 3 lets a crashed run resume without the ~10h OA loop."""
|
|
|
|
def test_round_trip_preserves_postcodes_and_geometry(self, tmp_path):
|
|
fragments = [
|
|
("AA1 1AA", box(0, 0, 100, 100)),
|
|
("AA1 1AB", box(200, 200, 250, 260)),
|
|
# A postcode spanning multiple OAs appears as repeated entries.
|
|
("AA1 1AA", box(100, 0, 150, 100)),
|
|
("AA1 1AC", MultiPolygon([box(0, 0, 10, 10), box(20, 20, 30, 30)])),
|
|
]
|
|
cache = tmp_path / "fragments_cache.parquet"
|
|
save_fragments(cache, fragments)
|
|
loaded = load_fragments(cache)
|
|
|
|
assert [pc for pc, _ in loaded] == [pc for pc, _ in fragments]
|
|
for (_, original), (_, restored) in zip(fragments, loaded):
|
|
assert restored.equals(original)
|
|
|
|
def test_save_is_atomic_no_tmp_left_behind(self, tmp_path):
|
|
cache = tmp_path / "fragments_cache.parquet"
|
|
save_fragments(cache, [("AA1 1AA", box(0, 0, 1, 1))])
|
|
assert cache.exists()
|
|
assert not (tmp_path / "fragments_cache.parquet.tmp").exists()
|
|
|
|
def test_missing_cache_is_not_fresh(self, tmp_path):
|
|
cache = tmp_path / "fragments_cache.parquet"
|
|
inp = tmp_path / "uprn.parquet"
|
|
inp.write_text("x")
|
|
assert fragments_cache_is_fresh(cache, [inp]) is False
|
|
|
|
def test_cache_newer_than_inputs_is_fresh(self, tmp_path):
|
|
import os
|
|
|
|
inp = tmp_path / "uprn.parquet"
|
|
inp.write_text("x")
|
|
cache = tmp_path / "fragments_cache.parquet"
|
|
cache.write_text("c")
|
|
os.utime(inp, (1_000, 1_000))
|
|
os.utime(cache, (2_000, 2_000))
|
|
assert fragments_cache_is_fresh(cache, [inp, None]) is True
|
|
|
|
def test_cache_older_than_any_input_is_stale(self, tmp_path):
|
|
import os
|
|
|
|
inp = tmp_path / "oa.gpkg"
|
|
inp.write_text("x")
|
|
cache = tmp_path / "fragments_cache.parquet"
|
|
cache.write_text("c")
|
|
os.utime(cache, (1_000, 1_000))
|
|
os.utime(inp, (2_000, 2_000)) # input touched after the cache
|
|
assert fragments_cache_is_fresh(cache, [inp]) is False
|
|
|
|
def test_missing_input_is_ignored(self, tmp_path):
|
|
cache = tmp_path / "fragments_cache.parquet"
|
|
cache.write_text("c")
|
|
# arcgis is optional/absent — it cannot have invalidated the cache.
|
|
assert fragments_cache_is_fresh(cache, [tmp_path / "absent.parquet"]) is True
|