perfect-postcode/pipeline/transform/test_tree_density.py
2026-05-31 20:20:41 +01:00

253 lines
9.4 KiB
Python

import math
import zipfile
from pathlib import Path
import numpy as np
import polars as pl
import pyogrio
import pytest
import shapely
from pipeline.transform.tree_density import (
_accumulate_clipped_area,
_coverage_percentile_expr,
_finalize_metrics,
_geometry_column,
_layers,
_metric_columns,
_nfi_dataset_path,
_postcode_buffers,
_postcode_density_percentile_col,
_safe_extract_zip_dir,
_with_postcode_density_percentiles,
)
def test_accumulate_clipped_area_adds_only_in_buffer_overlap() -> None:
radius_m = 50
points = pl.DataFrame({"postcode": ["A", "B"], "x": [0.0, 1000.0], "y": [0.0, 0.0]})
circles, tree = _postcode_buffers(points, radius_m)
buffer_area = math.pi * radius_m * radius_m
# A large square centred on postcode A fully covers A's buffer circle.
canopy_area = np.zeros(2)
big = shapely.box(-500, -500, 500, 500) # 1,000,000 sqm parcel
_accumulate_clipped_area(np.array([big], dtype=object), circles, tree, canopy_area)
# Only the clipped circle area is added (the 32-gon buffer approximates the
# circle to ~1%), NOT the full 1,000,000 sqm polygon.
assert canopy_area[0] == pytest.approx(buffer_area, rel=1e-2)
assert canopy_area[0] <= buffer_area # never exceeds the true buffer area
assert canopy_area[1] == 0.0 # postcode B is 1km away, no overlap
# A large parcel that only slivers into B's circle must add only the sliver,
# not its full area -- the failure mode a centroid/full-area path could not avoid.
canopy_area = np.zeros(2)
sliver = shapely.box(1040, -500, 2000, 500) # left edge 10m inside B's circle
_accumulate_clipped_area(
np.array([sliver], dtype=object), circles, tree, canopy_area
)
assert canopy_area[0] == 0.0
assert 0.0 < canopy_area[1] < buffer_area # tiny segment, far below 1M sqm
def test_accumulate_clipped_area_drops_missing_and_empty_geometry() -> None:
radius_m = 50
points = pl.DataFrame({"postcode": ["A"], "x": [0.0], "y": [0.0]})
circles, tree = _postcode_buffers(points, radius_m)
canopy_area = np.zeros(1)
geoms = np.array(
[None, shapely.from_wkt("POLYGON EMPTY"), shapely.box(-10, -10, 10, 10)],
dtype=object,
)
# A None and an empty geometry must be skipped, not crash, and only the real
# 400 sqm box is accumulated (it is fully inside the buffer).
_accumulate_clipped_area(geoms, circles, tree, canopy_area)
assert canopy_area[0] == pytest.approx(400.0)
def test_accumulate_clipped_area_height_weighted_by_overlap() -> None:
radius_m = 50
points = pl.DataFrame({"postcode": ["A"], "x": [0.0], "y": [0.0]})
circles, tree = _postcode_buffers(points, radius_m)
canopy_area = np.zeros(1)
height_weighted_sum = np.zeros(1)
height_weight = np.zeros(1)
geoms = np.array(
[
shapely.box(-10, -10, 0, 0), # 100 sqm, fully inside
shapely.box(0, 0, 20, 20), # 400 sqm, fully inside
shapely.box(-5, 0, 0, 5), # 25 sqm, NaN height -> ignored for height
],
dtype=object,
)
height = np.array([5.0, 10.0, np.nan])
_accumulate_clipped_area(
geoms,
circles,
tree,
canopy_area,
height=height,
height_weighted_sum=height_weighted_sum,
height_weight=height_weight,
)
# All three clipped areas count toward canopy; only the finite-height ones
# contribute to the area-weighted mean height.
assert canopy_area[0] == pytest.approx(525.0)
assert height_weight[0] == pytest.approx(500.0)
mean_height = height_weighted_sum[0] / height_weight[0]
assert mean_height == pytest.approx((5.0 * 100 + 10.0 * 400) / 500) # 9.0
def test_coverage_percentile_expr_ranks_higher_coverage_higher() -> None:
df = pl.DataFrame({"coverage": [0.0, 5.0, 10.0, None]})
result = df.lazy().with_columns(
_coverage_percentile_expr("coverage", "percentile")
).collect()
assert result["percentile"].to_list() == [0.0, 50.0, 100.0, None]
def test_coverage_percentile_expr_uses_tie_consistent_average_rank() -> None:
# Tied extremes share their mean rank instead of being pinned to exact 0/100,
# so the whole scale runs on one consistent average-rank formula.
df = pl.DataFrame({"coverage": [0.0, 0.0, 5.0, 10.0, 10.0]})
result = df.lazy().with_columns(
_coverage_percentile_expr("coverage", "percentile")
).collect()
assert result["percentile"].to_list() == [12.5, 12.5, 50.0, 87.5, 87.5]
def test_coverage_percentile_expr_all_equal_is_neutral_midpoint() -> None:
all_equal = pl.DataFrame({"coverage": [5.0, 5.0, 5.0]})
single = pl.DataFrame({"coverage": [7.0]})
with_null = pl.DataFrame({"coverage": [None, 5.0, 5.0, 5.0]})
def percentiles(df: pl.DataFrame) -> list:
return (
df.lazy()
.with_columns(_coverage_percentile_expr("coverage", "percentile"))
.collect()["percentile"]
.to_list()
)
assert percentiles(all_equal) == [50.0, 50.0, 50.0]
assert percentiles(single) == [50.0]
assert percentiles(with_null) == [None, 50.0, 50.0, 50.0]
def test_finalize_metrics_caps_density_keeps_raw_area_and_weights_height() -> None:
radius_m = 50
buffer_area = math.pi * radius_m * radius_m
density_col, area_col, height_col = _metric_columns(radius_m)
points = pl.DataFrame({"postcode": ["AA1 1AA", "AA1 1AB", "AA1 1AC"]})
canopy_area = np.array([0.0, buffer_area * 0.5, buffer_area * 2.0])
# Postcode 0: no height samples -> null. Postcode 1: area-weighted mean = 5.
height_weighted_sum = np.array([0.0, 500.0, 0.0])
height_weight = np.array([0.0, 100.0, 0.0])
metrics = _finalize_metrics(
points, canopy_area, height_weighted_sum, height_weight, radius_m
)
assert metrics[density_col].to_list() == [0.0, 50.0, 100.0] # capped at 100
# area_col is the raw clipped accumulation, intentionally uncapped.
assert metrics[area_col].to_list() == pytest.approx(
[0.0, round(buffer_area * 0.5, 1), round(buffer_area * 2.0, 1)]
)
assert metrics[height_col].to_list() == [None, 5.0, None]
# The mixed-unit feature-count column has been removed entirely.
assert "Tree features within 50m" not in metrics.columns
assert set(metrics.columns) == {"postcode", density_col, area_col, height_col}
def test_postcode_density_percentiles_rank_over_density() -> None:
radius_m = 50
density_col, area_col, height_col = _metric_columns(radius_m)
percentile_col = _postcode_density_percentile_col(radius_m)
metrics = _with_postcode_density_percentiles(
pl.DataFrame(
{
"postcode": ["AA1 1AA", "AA1 1AB", "AA1 1AC"],
density_col: [10.0, 30.0, 50.0],
area_col: [100.0, 300.0, 500.0],
height_col: [4.0, 6.0, 8.0],
}
),
radius_m,
)
assert percentile_col in metrics.columns
assert metrics[percentile_col].to_list() == [0.0, 50.0, 100.0]
def test_safe_extract_zip_dir_rejects_path_traversal(tmp_path: Path) -> None:
malicious = tmp_path / "evil.zip"
with zipfile.ZipFile(malicious, "w") as archive:
archive.writestr("../escape.txt", "pwned")
with pytest.raises(ValueError, match="Unsafe path"):
_safe_extract_zip_dir(malicious, tmp_path / "extract", force=True)
def test_safe_extract_zip_dir_extracts_benign_archive(tmp_path: Path) -> None:
benign = tmp_path / "ok.zip"
with zipfile.ZipFile(benign, "w") as archive:
archive.writestr("data/x.txt", "hello")
extract_dir = tmp_path / "extract"
result = _safe_extract_zip_dir(benign, extract_dir, force=True)
assert result == extract_dir
assert (extract_dir / "data" / "x.txt").read_text() == "hello"
def test_geometry_column_resolution() -> None:
assert _geometry_column({"geometry_name": "SHAPE"}, ["MEANHT", "SHAPE"]) == "SHAPE"
assert _geometry_column({}, ["a", "wkb_geometry", "b"]) == "wkb_geometry"
assert _geometry_column({"geometry_name": None}, ["x", "geom"]) == "geom"
assert _geometry_column({}, ["a", "b", "c"]) == "c" # last-column fallback
def _zip_with_shapefiles(zip_path: Path, names: list[str]) -> None:
with zipfile.ZipFile(zip_path, "w") as archive:
for name in names:
archive.writestr(name, "")
def test_nfi_dataset_path_requires_exactly_one_shapefile(tmp_path: Path) -> None:
multi = tmp_path / "multi.zip"
_zip_with_shapefiles(multi, ["a.shp", "b.shp"])
with pytest.raises(ValueError, match="exactly one shapefile"):
_nfi_dataset_path(multi, tmp_path / "multi_x", force_extract=True, use_vsizip=False)
none = tmp_path / "none.zip"
_zip_with_shapefiles(none, ["readme.txt"])
with pytest.raises(FileNotFoundError):
_nfi_dataset_path(none, tmp_path / "none_x", force_extract=True, use_vsizip=False)
one = tmp_path / "one.zip"
_zip_with_shapefiles(one, ["woodland.shp", "woodland.dbf"])
resolved = _nfi_dataset_path(
one, tmp_path / "one_x", force_extract=True, use_vsizip=False
)
assert resolved.endswith("woodland.shp")
def test_layers_selection_and_unknown(monkeypatch: pytest.MonkeyPatch) -> None:
monkeypatch.setattr(
pyogrio,
"list_layers",
lambda _path: [("L1", "Polygon"), ("L2", "Polygon")],
)
assert _layers("ignored", None) == ["L1", "L2"]
assert _layers("ignored", ("L2",)) == ["L2"]
with pytest.raises(ValueError, match="Unknown TOW layer"):
_layers("ignored", ("L3",))