import math import zipfile from pathlib import Path import numpy as np import polars as pl import pyogrio import pytest import shapely from pipeline.transform.tree_density import ( _accumulate_clipped_area, _coverage_percentile_expr, _finalize_metrics, _geometry_column, _layers, _metric_columns, _nfi_dataset_path, _postcode_buffers, _postcode_density_percentile_col, _safe_extract_zip_dir, _with_postcode_density_percentiles, ) def test_accumulate_clipped_area_adds_only_in_buffer_overlap() -> None: radius_m = 50 points = pl.DataFrame({"postcode": ["A", "B"], "x": [0.0, 1000.0], "y": [0.0, 0.0]}) circles, tree = _postcode_buffers(points, radius_m) buffer_area = math.pi * radius_m * radius_m # A large square centred on postcode A fully covers A's buffer circle. canopy_area = np.zeros(2) big = shapely.box(-500, -500, 500, 500) # 1,000,000 sqm parcel _accumulate_clipped_area(np.array([big], dtype=object), circles, tree, canopy_area) # Only the clipped circle area is added (the 32-gon buffer approximates the # circle to ~1%), NOT the full 1,000,000 sqm polygon. assert canopy_area[0] == pytest.approx(buffer_area, rel=1e-2) assert canopy_area[0] <= buffer_area # never exceeds the true buffer area assert canopy_area[1] == 0.0 # postcode B is 1km away, no overlap # A large parcel that only slivers into B's circle must add only the sliver, # not its full area -- the failure mode a centroid/full-area path could not avoid. canopy_area = np.zeros(2) sliver = shapely.box(1040, -500, 2000, 500) # left edge 10m inside B's circle _accumulate_clipped_area( np.array([sliver], dtype=object), circles, tree, canopy_area ) assert canopy_area[0] == 0.0 assert 0.0 < canopy_area[1] < buffer_area # tiny segment, far below 1M sqm def test_accumulate_clipped_area_drops_missing_and_empty_geometry() -> None: radius_m = 50 points = pl.DataFrame({"postcode": ["A"], "x": [0.0], "y": [0.0]}) circles, tree = _postcode_buffers(points, radius_m) canopy_area = np.zeros(1) geoms = np.array( [None, shapely.from_wkt("POLYGON EMPTY"), shapely.box(-10, -10, 10, 10)], dtype=object, ) # A None and an empty geometry must be skipped, not crash, and only the real # 400 sqm box is accumulated (it is fully inside the buffer). _accumulate_clipped_area(geoms, circles, tree, canopy_area) assert canopy_area[0] == pytest.approx(400.0) def test_accumulate_clipped_area_survives_invalid_polygon() -> None: """A self-intersecting external polygon (TOW/NFI data is occasionally invalid) must not abort the batched overlay with 'TopologyException: side location conflict'; its repaired in-buffer area is still accumulated.""" radius_m = 50 points = pl.DataFrame({"postcode": ["A"], "x": [0.0], "y": [0.0]}) circles, tree = _postcode_buffers(points, radius_m) # Bow-tie centred on A: self-intersecting => invalid. The raw batched # shapely.intersection raises 'side location conflict' on it; make_valid splits # it into two triangles of total area 200, fully inside A's radius-50 buffer. bowtie = shapely.Polygon([(-10, -10), (10, 10), (10, -10), (-10, 10), (-10, -10)]) assert not shapely.is_valid(bowtie) # precondition with pytest.raises(shapely.errors.GEOSException): # documents the raw hazard shapely.intersection( np.array([bowtie], dtype=object), np.array([circles[0]], dtype=object) ) canopy_area = np.zeros(1) _accumulate_clipped_area(np.array([bowtie], dtype=object), circles, tree, canopy_area) assert canopy_area[0] == pytest.approx(200.0, rel=1e-3) def test_robust_intersection_area_recovers_from_overlay_failure(monkeypatch) -> None: """The batched-overlay fallback must absorb a GEOSException from the fast path and recover (validate + retry), returning the correct per-pair areas. Version independent: the fast-path failure is forced rather than data-dependent.""" from pipeline.transform import tree_density real_intersection = shapely.intersection calls = {"n": 0} def flaky(a, b, **kwargs): calls["n"] += 1 if calls["n"] == 1: # fail only the first (fast-path) call raise shapely.errors.GEOSException("forced side location conflict") return real_intersection(a, b, **kwargs) monkeypatch.setattr(tree_density.shapely, "intersection", flaky) a = np.array([shapely.box(0, 0, 10, 10), shapely.box(0, 0, 4, 4)], dtype=object) b = np.array([shapely.box(0, 0, 6, 6), shapely.box(0, 0, 2, 2)], dtype=object) out = tree_density._robust_intersection_area(a, b) assert calls["n"] >= 2 # fast path failed -> fallback path executed assert out.tolist() == pytest.approx([36.0, 4.0]) def test_accumulate_clipped_area_height_weighted_by_overlap() -> None: radius_m = 50 points = pl.DataFrame({"postcode": ["A"], "x": [0.0], "y": [0.0]}) circles, tree = _postcode_buffers(points, radius_m) canopy_area = np.zeros(1) height_weighted_sum = np.zeros(1) height_weight = np.zeros(1) geoms = np.array( [ shapely.box(-10, -10, 0, 0), # 100 sqm, fully inside shapely.box(0, 0, 20, 20), # 400 sqm, fully inside shapely.box(-5, 0, 0, 5), # 25 sqm, NaN height -> ignored for height ], dtype=object, ) height = np.array([5.0, 10.0, np.nan]) _accumulate_clipped_area( geoms, circles, tree, canopy_area, height=height, height_weighted_sum=height_weighted_sum, height_weight=height_weight, ) # All three clipped areas count toward canopy; only the finite-height ones # contribute to the area-weighted mean height. assert canopy_area[0] == pytest.approx(525.0) assert height_weight[0] == pytest.approx(500.0) mean_height = height_weighted_sum[0] / height_weight[0] assert mean_height == pytest.approx((5.0 * 100 + 10.0 * 400) / 500) # 9.0 def test_coverage_percentile_expr_ranks_higher_coverage_higher() -> None: df = pl.DataFrame({"coverage": [0.0, 5.0, 10.0, None]}) result = df.lazy().with_columns( _coverage_percentile_expr("coverage", "percentile") ).collect() assert result["percentile"].to_list() == [0.0, 50.0, 100.0, None] def test_coverage_percentile_expr_uses_tie_consistent_average_rank() -> None: # Tied extremes share their mean rank instead of being pinned to exact 0/100, # so the whole scale runs on one consistent average-rank formula. df = pl.DataFrame({"coverage": [0.0, 0.0, 5.0, 10.0, 10.0]}) result = df.lazy().with_columns( _coverage_percentile_expr("coverage", "percentile") ).collect() assert result["percentile"].to_list() == [12.5, 12.5, 50.0, 87.5, 87.5] def test_coverage_percentile_expr_all_equal_is_neutral_midpoint() -> None: all_equal = pl.DataFrame({"coverage": [5.0, 5.0, 5.0]}) single = pl.DataFrame({"coverage": [7.0]}) with_null = pl.DataFrame({"coverage": [None, 5.0, 5.0, 5.0]}) def percentiles(df: pl.DataFrame) -> list: return ( df.lazy() .with_columns(_coverage_percentile_expr("coverage", "percentile")) .collect()["percentile"] .to_list() ) assert percentiles(all_equal) == [50.0, 50.0, 50.0] assert percentiles(single) == [50.0] assert percentiles(with_null) == [None, 50.0, 50.0, 50.0] def test_finalize_metrics_caps_density_keeps_raw_area_and_weights_height() -> None: radius_m = 50 buffer_area = math.pi * radius_m * radius_m density_col, area_col, height_col = _metric_columns(radius_m) points = pl.DataFrame({"postcode": ["AA1 1AA", "AA1 1AB", "AA1 1AC"]}) canopy_area = np.array([0.0, buffer_area * 0.5, buffer_area * 2.0]) # Postcode 0: no height samples -> null. Postcode 1: area-weighted mean = 5. height_weighted_sum = np.array([0.0, 500.0, 0.0]) height_weight = np.array([0.0, 100.0, 0.0]) metrics = _finalize_metrics( points, canopy_area, height_weighted_sum, height_weight, radius_m ) assert metrics[density_col].to_list() == [0.0, 50.0, 100.0] # capped at 100 # area_col is the raw clipped accumulation, intentionally uncapped. assert metrics[area_col].to_list() == pytest.approx( [0.0, round(buffer_area * 0.5, 1), round(buffer_area * 2.0, 1)] ) assert metrics[height_col].to_list() == [None, 5.0, None] # The mixed-unit feature-count column has been removed entirely. assert "Tree features within 50m" not in metrics.columns assert set(metrics.columns) == {"postcode", density_col, area_col, height_col} def test_postcode_density_percentiles_rank_over_density() -> None: radius_m = 50 density_col, area_col, height_col = _metric_columns(radius_m) percentile_col = _postcode_density_percentile_col(radius_m) metrics = _with_postcode_density_percentiles( pl.DataFrame( { "postcode": ["AA1 1AA", "AA1 1AB", "AA1 1AC"], density_col: [10.0, 30.0, 50.0], area_col: [100.0, 300.0, 500.0], height_col: [4.0, 6.0, 8.0], } ), radius_m, ) assert percentile_col in metrics.columns assert metrics[percentile_col].to_list() == [0.0, 50.0, 100.0] def test_safe_extract_zip_dir_rejects_path_traversal(tmp_path: Path) -> None: malicious = tmp_path / "evil.zip" with zipfile.ZipFile(malicious, "w") as archive: archive.writestr("../escape.txt", "pwned") with pytest.raises(ValueError, match="Unsafe path"): _safe_extract_zip_dir(malicious, tmp_path / "extract", force=True) def test_safe_extract_zip_dir_extracts_benign_archive(tmp_path: Path) -> None: benign = tmp_path / "ok.zip" with zipfile.ZipFile(benign, "w") as archive: archive.writestr("data/x.txt", "hello") extract_dir = tmp_path / "extract" result = _safe_extract_zip_dir(benign, extract_dir, force=True) assert result == extract_dir assert (extract_dir / "data" / "x.txt").read_text() == "hello" def test_geometry_column_resolution() -> None: assert _geometry_column({"geometry_name": "SHAPE"}, ["MEANHT", "SHAPE"]) == "SHAPE" assert _geometry_column({}, ["a", "wkb_geometry", "b"]) == "wkb_geometry" assert _geometry_column({"geometry_name": None}, ["x", "geom"]) == "geom" assert _geometry_column({}, ["a", "b", "c"]) == "c" # last-column fallback def _zip_with_shapefiles(zip_path: Path, names: list[str]) -> None: with zipfile.ZipFile(zip_path, "w") as archive: for name in names: archive.writestr(name, "") def test_nfi_dataset_path_requires_exactly_one_shapefile(tmp_path: Path) -> None: multi = tmp_path / "multi.zip" _zip_with_shapefiles(multi, ["a.shp", "b.shp"]) with pytest.raises(ValueError, match="exactly one shapefile"): _nfi_dataset_path(multi, tmp_path / "multi_x", force_extract=True, use_vsizip=False) none = tmp_path / "none.zip" _zip_with_shapefiles(none, ["readme.txt"]) with pytest.raises(FileNotFoundError): _nfi_dataset_path(none, tmp_path / "none_x", force_extract=True, use_vsizip=False) one = tmp_path / "one.zip" _zip_with_shapefiles(one, ["woodland.shp", "woodland.dbf"]) resolved = _nfi_dataset_path( one, tmp_path / "one_x", force_extract=True, use_vsizip=False ) assert resolved.endswith("woodland.shp") def test_layers_selection_and_unknown(monkeypatch: pytest.MonkeyPatch) -> None: monkeypatch.setattr( pyogrio, "list_layers", lambda _path: [("L1", "Polygon"), ("L2", "Polygon")], ) assert _layers("ignored", None) == ["L1", "L2"] assert _layers("ignored", ("L2",)) == ["L2"] with pytest.raises(ValueError, match="Unknown TOW layer"): _layers("ignored", ("L3",))