300 lines
12 KiB
Python
300 lines
12 KiB
Python
import math
|
|
import zipfile
|
|
from pathlib import Path
|
|
|
|
import numpy as np
|
|
import polars as pl
|
|
import pyogrio
|
|
import pytest
|
|
import shapely
|
|
|
|
from pipeline.transform.tree_density import (
|
|
_accumulate_clipped_area,
|
|
_coverage_percentile_expr,
|
|
_finalize_metrics,
|
|
_geometry_column,
|
|
_layers,
|
|
_metric_columns,
|
|
_nfi_dataset_path,
|
|
_postcode_buffers,
|
|
_postcode_density_percentile_col,
|
|
_safe_extract_zip_dir,
|
|
_with_postcode_density_percentiles,
|
|
)
|
|
|
|
|
|
def test_accumulate_clipped_area_adds_only_in_buffer_overlap() -> None:
|
|
radius_m = 50
|
|
points = pl.DataFrame({"postcode": ["A", "B"], "x": [0.0, 1000.0], "y": [0.0, 0.0]})
|
|
circles, tree = _postcode_buffers(points, radius_m)
|
|
buffer_area = math.pi * radius_m * radius_m
|
|
|
|
# A large square centred on postcode A fully covers A's buffer circle.
|
|
canopy_area = np.zeros(2)
|
|
big = shapely.box(-500, -500, 500, 500) # 1,000,000 sqm parcel
|
|
_accumulate_clipped_area(np.array([big], dtype=object), circles, tree, canopy_area)
|
|
# Only the clipped circle area is added (the 32-gon buffer approximates the
|
|
# circle to ~1%), NOT the full 1,000,000 sqm polygon.
|
|
assert canopy_area[0] == pytest.approx(buffer_area, rel=1e-2)
|
|
assert canopy_area[0] <= buffer_area # never exceeds the true buffer area
|
|
assert canopy_area[1] == 0.0 # postcode B is 1km away, no overlap
|
|
|
|
# A large parcel that only slivers into B's circle must add only the sliver,
|
|
# not its full area -- the failure mode a centroid/full-area path could not avoid.
|
|
canopy_area = np.zeros(2)
|
|
sliver = shapely.box(1040, -500, 2000, 500) # left edge 10m inside B's circle
|
|
_accumulate_clipped_area(
|
|
np.array([sliver], dtype=object), circles, tree, canopy_area
|
|
)
|
|
assert canopy_area[0] == 0.0
|
|
assert 0.0 < canopy_area[1] < buffer_area # tiny segment, far below 1M sqm
|
|
|
|
|
|
def test_accumulate_clipped_area_drops_missing_and_empty_geometry() -> None:
|
|
radius_m = 50
|
|
points = pl.DataFrame({"postcode": ["A"], "x": [0.0], "y": [0.0]})
|
|
circles, tree = _postcode_buffers(points, radius_m)
|
|
|
|
canopy_area = np.zeros(1)
|
|
geoms = np.array(
|
|
[None, shapely.from_wkt("POLYGON EMPTY"), shapely.box(-10, -10, 10, 10)],
|
|
dtype=object,
|
|
)
|
|
# A None and an empty geometry must be skipped, not crash, and only the real
|
|
# 400 sqm box is accumulated (it is fully inside the buffer).
|
|
_accumulate_clipped_area(geoms, circles, tree, canopy_area)
|
|
assert canopy_area[0] == pytest.approx(400.0)
|
|
|
|
|
|
def test_accumulate_clipped_area_survives_invalid_polygon() -> None:
|
|
"""A self-intersecting external polygon (TOW/NFI data is occasionally invalid)
|
|
must not abort the batched overlay with 'TopologyException: side location
|
|
conflict'; its repaired in-buffer area is still accumulated."""
|
|
radius_m = 50
|
|
points = pl.DataFrame({"postcode": ["A"], "x": [0.0], "y": [0.0]})
|
|
circles, tree = _postcode_buffers(points, radius_m)
|
|
|
|
# Bow-tie centred on A: self-intersecting => invalid. The raw batched
|
|
# shapely.intersection raises 'side location conflict' on it; make_valid splits
|
|
# it into two triangles of total area 200, fully inside A's radius-50 buffer.
|
|
bowtie = shapely.Polygon([(-10, -10), (10, 10), (10, -10), (-10, 10), (-10, -10)])
|
|
assert not shapely.is_valid(bowtie) # precondition
|
|
with pytest.raises(shapely.errors.GEOSException): # documents the raw hazard
|
|
shapely.intersection(
|
|
np.array([bowtie], dtype=object), np.array([circles[0]], dtype=object)
|
|
)
|
|
|
|
canopy_area = np.zeros(1)
|
|
_accumulate_clipped_area(np.array([bowtie], dtype=object), circles, tree, canopy_area)
|
|
assert canopy_area[0] == pytest.approx(200.0, rel=1e-3)
|
|
|
|
|
|
def test_robust_intersection_area_recovers_from_overlay_failure(monkeypatch) -> None:
|
|
"""The batched-overlay fallback must absorb a GEOSException from the fast path
|
|
and recover (validate + retry), returning the correct per-pair areas. Version
|
|
independent: the fast-path failure is forced rather than data-dependent."""
|
|
from pipeline.transform import tree_density
|
|
|
|
real_intersection = shapely.intersection
|
|
calls = {"n": 0}
|
|
|
|
def flaky(a, b, **kwargs):
|
|
calls["n"] += 1
|
|
if calls["n"] == 1: # fail only the first (fast-path) call
|
|
raise shapely.errors.GEOSException("forced side location conflict")
|
|
return real_intersection(a, b, **kwargs)
|
|
|
|
monkeypatch.setattr(tree_density.shapely, "intersection", flaky)
|
|
|
|
a = np.array([shapely.box(0, 0, 10, 10), shapely.box(0, 0, 4, 4)], dtype=object)
|
|
b = np.array([shapely.box(0, 0, 6, 6), shapely.box(0, 0, 2, 2)], dtype=object)
|
|
out = tree_density._robust_intersection_area(a, b)
|
|
assert calls["n"] >= 2 # fast path failed -> fallback path executed
|
|
assert out.tolist() == pytest.approx([36.0, 4.0])
|
|
|
|
|
|
def test_accumulate_clipped_area_height_weighted_by_overlap() -> None:
|
|
radius_m = 50
|
|
points = pl.DataFrame({"postcode": ["A"], "x": [0.0], "y": [0.0]})
|
|
circles, tree = _postcode_buffers(points, radius_m)
|
|
|
|
canopy_area = np.zeros(1)
|
|
height_weighted_sum = np.zeros(1)
|
|
height_weight = np.zeros(1)
|
|
geoms = np.array(
|
|
[
|
|
shapely.box(-10, -10, 0, 0), # 100 sqm, fully inside
|
|
shapely.box(0, 0, 20, 20), # 400 sqm, fully inside
|
|
shapely.box(-5, 0, 0, 5), # 25 sqm, NaN height -> ignored for height
|
|
],
|
|
dtype=object,
|
|
)
|
|
height = np.array([5.0, 10.0, np.nan])
|
|
|
|
_accumulate_clipped_area(
|
|
geoms,
|
|
circles,
|
|
tree,
|
|
canopy_area,
|
|
height=height,
|
|
height_weighted_sum=height_weighted_sum,
|
|
height_weight=height_weight,
|
|
)
|
|
|
|
# All three clipped areas count toward canopy; only the finite-height ones
|
|
# contribute to the area-weighted mean height.
|
|
assert canopy_area[0] == pytest.approx(525.0)
|
|
assert height_weight[0] == pytest.approx(500.0)
|
|
mean_height = height_weighted_sum[0] / height_weight[0]
|
|
assert mean_height == pytest.approx((5.0 * 100 + 10.0 * 400) / 500) # 9.0
|
|
|
|
|
|
def test_coverage_percentile_expr_ranks_higher_coverage_higher() -> None:
|
|
df = pl.DataFrame({"coverage": [0.0, 5.0, 10.0, None]})
|
|
|
|
result = df.lazy().with_columns(
|
|
_coverage_percentile_expr("coverage", "percentile")
|
|
).collect()
|
|
|
|
assert result["percentile"].to_list() == [0.0, 50.0, 100.0, None]
|
|
|
|
|
|
def test_coverage_percentile_expr_uses_tie_consistent_average_rank() -> None:
|
|
# Tied extremes share their mean rank instead of being pinned to exact 0/100,
|
|
# so the whole scale runs on one consistent average-rank formula.
|
|
df = pl.DataFrame({"coverage": [0.0, 0.0, 5.0, 10.0, 10.0]})
|
|
|
|
result = df.lazy().with_columns(
|
|
_coverage_percentile_expr("coverage", "percentile")
|
|
).collect()
|
|
|
|
assert result["percentile"].to_list() == [12.5, 12.5, 50.0, 87.5, 87.5]
|
|
|
|
|
|
def test_coverage_percentile_expr_all_equal_is_neutral_midpoint() -> None:
|
|
all_equal = pl.DataFrame({"coverage": [5.0, 5.0, 5.0]})
|
|
single = pl.DataFrame({"coverage": [7.0]})
|
|
with_null = pl.DataFrame({"coverage": [None, 5.0, 5.0, 5.0]})
|
|
|
|
def percentiles(df: pl.DataFrame) -> list:
|
|
return (
|
|
df.lazy()
|
|
.with_columns(_coverage_percentile_expr("coverage", "percentile"))
|
|
.collect()["percentile"]
|
|
.to_list()
|
|
)
|
|
|
|
assert percentiles(all_equal) == [50.0, 50.0, 50.0]
|
|
assert percentiles(single) == [50.0]
|
|
assert percentiles(with_null) == [None, 50.0, 50.0, 50.0]
|
|
|
|
|
|
def test_finalize_metrics_caps_density_keeps_raw_area_and_weights_height() -> None:
|
|
radius_m = 50
|
|
buffer_area = math.pi * radius_m * radius_m
|
|
density_col, area_col, height_col = _metric_columns(radius_m)
|
|
|
|
points = pl.DataFrame({"postcode": ["AA1 1AA", "AA1 1AB", "AA1 1AC"]})
|
|
canopy_area = np.array([0.0, buffer_area * 0.5, buffer_area * 2.0])
|
|
# Postcode 0: no height samples -> null. Postcode 1: area-weighted mean = 5.
|
|
height_weighted_sum = np.array([0.0, 500.0, 0.0])
|
|
height_weight = np.array([0.0, 100.0, 0.0])
|
|
|
|
metrics = _finalize_metrics(
|
|
points, canopy_area, height_weighted_sum, height_weight, radius_m
|
|
)
|
|
|
|
assert metrics[density_col].to_list() == [0.0, 50.0, 100.0] # capped at 100
|
|
# area_col is the raw clipped accumulation, intentionally uncapped.
|
|
assert metrics[area_col].to_list() == pytest.approx(
|
|
[0.0, round(buffer_area * 0.5, 1), round(buffer_area * 2.0, 1)]
|
|
)
|
|
assert metrics[height_col].to_list() == [None, 5.0, None]
|
|
# The mixed-unit feature-count column has been removed entirely.
|
|
assert "Tree features within 50m" not in metrics.columns
|
|
assert set(metrics.columns) == {"postcode", density_col, area_col, height_col}
|
|
|
|
|
|
def test_postcode_density_percentiles_rank_over_density() -> None:
|
|
radius_m = 50
|
|
density_col, area_col, height_col = _metric_columns(radius_m)
|
|
percentile_col = _postcode_density_percentile_col(radius_m)
|
|
|
|
metrics = _with_postcode_density_percentiles(
|
|
pl.DataFrame(
|
|
{
|
|
"postcode": ["AA1 1AA", "AA1 1AB", "AA1 1AC"],
|
|
density_col: [10.0, 30.0, 50.0],
|
|
area_col: [100.0, 300.0, 500.0],
|
|
height_col: [4.0, 6.0, 8.0],
|
|
}
|
|
),
|
|
radius_m,
|
|
)
|
|
|
|
assert percentile_col in metrics.columns
|
|
assert metrics[percentile_col].to_list() == [0.0, 50.0, 100.0]
|
|
|
|
|
|
def test_safe_extract_zip_dir_rejects_path_traversal(tmp_path: Path) -> None:
|
|
malicious = tmp_path / "evil.zip"
|
|
with zipfile.ZipFile(malicious, "w") as archive:
|
|
archive.writestr("../escape.txt", "pwned")
|
|
|
|
with pytest.raises(ValueError, match="Unsafe path"):
|
|
_safe_extract_zip_dir(malicious, tmp_path / "extract", force=True)
|
|
|
|
|
|
def test_safe_extract_zip_dir_extracts_benign_archive(tmp_path: Path) -> None:
|
|
benign = tmp_path / "ok.zip"
|
|
with zipfile.ZipFile(benign, "w") as archive:
|
|
archive.writestr("data/x.txt", "hello")
|
|
|
|
extract_dir = tmp_path / "extract"
|
|
result = _safe_extract_zip_dir(benign, extract_dir, force=True)
|
|
assert result == extract_dir
|
|
assert (extract_dir / "data" / "x.txt").read_text() == "hello"
|
|
|
|
|
|
def test_geometry_column_resolution() -> None:
|
|
assert _geometry_column({"geometry_name": "SHAPE"}, ["MEANHT", "SHAPE"]) == "SHAPE"
|
|
assert _geometry_column({}, ["a", "wkb_geometry", "b"]) == "wkb_geometry"
|
|
assert _geometry_column({"geometry_name": None}, ["x", "geom"]) == "geom"
|
|
assert _geometry_column({}, ["a", "b", "c"]) == "c" # last-column fallback
|
|
|
|
|
|
def _zip_with_shapefiles(zip_path: Path, names: list[str]) -> None:
|
|
with zipfile.ZipFile(zip_path, "w") as archive:
|
|
for name in names:
|
|
archive.writestr(name, "")
|
|
|
|
|
|
def test_nfi_dataset_path_requires_exactly_one_shapefile(tmp_path: Path) -> None:
|
|
multi = tmp_path / "multi.zip"
|
|
_zip_with_shapefiles(multi, ["a.shp", "b.shp"])
|
|
with pytest.raises(ValueError, match="exactly one shapefile"):
|
|
_nfi_dataset_path(multi, tmp_path / "multi_x", force_extract=True, use_vsizip=False)
|
|
|
|
none = tmp_path / "none.zip"
|
|
_zip_with_shapefiles(none, ["readme.txt"])
|
|
with pytest.raises(FileNotFoundError):
|
|
_nfi_dataset_path(none, tmp_path / "none_x", force_extract=True, use_vsizip=False)
|
|
|
|
one = tmp_path / "one.zip"
|
|
_zip_with_shapefiles(one, ["woodland.shp", "woodland.dbf"])
|
|
resolved = _nfi_dataset_path(
|
|
one, tmp_path / "one_x", force_extract=True, use_vsizip=False
|
|
)
|
|
assert resolved.endswith("woodland.shp")
|
|
|
|
|
|
def test_layers_selection_and_unknown(monkeypatch: pytest.MonkeyPatch) -> None:
|
|
monkeypatch.setattr(
|
|
pyogrio,
|
|
"list_layers",
|
|
lambda _path: [("L1", "Polygon"), ("L2", "Polygon")],
|
|
)
|
|
assert _layers("ignored", None) == ["L1", "L2"]
|
|
assert _layers("ignored", ("L2",)) == ["L2"]
|
|
with pytest.raises(ValueError, match="Unknown TOW layer"):
|
|
_layers("ignored", ("L3",))
|