"""Build PMTiles polygon tiles for the Trees Outside Woodland + NFI overlay.""" from __future__ import annotations import argparse import json import shutil import subprocess import tempfile from pathlib import Path import numpy as np import pyogrio import shapely from pyproj import Transformer from pipeline.local_temp import local_tmp_dir from pipeline.transform.tree_density import ( NFI_AREA_HA_COL, NFI_CATEGORY_COL, NFI_TYPE_COL, NFI_WOODLAND_VALUE, _geometry_column, _layers, _nfi_dataset_path, _tow_dataset_path, ) def _require_tippecanoe() -> str: executable = shutil.which("tippecanoe") if executable is None: raise RuntimeError( "tippecanoe is required to build tree overlay PMTiles. " "Install tippecanoe and rerun this target." ) return executable def _column_or_none(batch, names: list[str], column: str): if column not in names: return None return batch.column(names.index(column)).to_numpy(zero_copy_only=False) def _number_or_none(value) -> float | int | None: if value is None: return None try: if np.isfinite(value): if float(value).is_integer(): return int(value) return round(float(value), 2) except TypeError: return None return None def _write_tree_geojsonseq( dataset_path: str, output_path: Path, batch_size: int, layer_names: tuple[str, ...] | None, max_features_per_layer: int | None, ) -> int: to_wgs84 = Transformer.from_crs("EPSG:27700", "EPSG:4326", always_xy=True) layers = _layers(dataset_path, layer_names) print(f"Processing {len(layers)} TOW layer(s): {', '.join(layers)}") columns = [ "TOW_ID", "Woodland_Type", "TOW_Area_M", "MEANHT", "MINHT", "MAXHT", "LiDAR_Survey_Year", ] feature_count = 0 with output_path.open("w") as file: for layer in layers: info = pyogrio.read_info(dataset_path, layer=layer) print(f"\nLayer {layer}: {info.get('features', 0):,} features") layer_features_seen = 0 with pyogrio.open_arrow( dataset_path, layer=layer, columns=columns, batch_size=batch_size, use_pyarrow=True, ) as (meta, reader): for batch in reader: if max_features_per_layer is not None: remaining = max_features_per_layer - layer_features_seen if remaining <= 0: break if batch.num_rows > remaining: batch = batch.slice(0, remaining) layer_features_seen += batch.num_rows names = batch.schema.names geometry_column = _geometry_column(meta, names) area = np.asarray( batch.column(names.index("TOW_Area_M")).to_numpy( zero_copy_only=False ), dtype=np.float64, ) geometry = np.asarray( batch.column(names.index(geometry_column)).to_numpy( zero_copy_only=False ), dtype=object, ) valid = np.isfinite(area) & (area > 0) if not valid.any(): continue tow_id = _column_or_none(batch, names, "TOW_ID") woodland_type = _column_or_none(batch, names, "Woodland_Type") mean_height = _column_or_none(batch, names, "MEANHT") min_height = _column_or_none(batch, names, "MINHT") max_height = _column_or_none(batch, names, "MAXHT") lidar_year = _column_or_none(batch, names, "LiDAR_Survey_Year") geometries = shapely.from_wkb(geometry[valid]) geometries = shapely.transform( geometries, to_wgs84.transform, interleaved=False, ) geometries_json = shapely.to_geojson(geometries) valid_indexes = np.flatnonzero(valid) for idx, geometry_json in zip(valid_indexes, geometries_json): properties = { "source": "tow", "tow_id": str(tow_id[idx]) if tow_id is not None else "", "woodland_type": ( str(woodland_type[idx]) if woodland_type is not None else "" ), "area_sqm": _number_or_none(area[idx]), "mean_height_m": ( _number_or_none(mean_height[idx]) if mean_height is not None else None ), "min_height_m": ( _number_or_none(min_height[idx]) if min_height is not None else None ), "max_height_m": ( _number_or_none(max_height[idx]) if max_height is not None else None ), "lidar_year": ( _number_or_none(lidar_year[idx]) if lidar_year is not None else None ), "source_layer": layer, } feature = { "type": "Feature", "geometry": json.loads(geometry_json), "properties": properties, } file.write(json.dumps(feature, separators=(",", ":")) + "\n") feature_count += 1 return feature_count def _append_nfi_geojsonseq( dataset_path: str, output_path: Path, batch_size: int, max_nfi_features: int | None, ) -> int: """Append NFI woodland polygons to the same GeoJSONSeq as the TOW features.""" to_wgs84 = Transformer.from_crs("EPSG:27700", "EPSG:4326", always_xy=True) layers = _layers(dataset_path, None) print(f"Processing {len(layers)} NFI layer(s): {', '.join(layers)}") columns = [NFI_CATEGORY_COL, NFI_TYPE_COL, NFI_AREA_HA_COL] feature_count = 0 features_seen = 0 with output_path.open("a") as file: for layer in layers: with pyogrio.open_arrow( dataset_path, layer=layer, columns=columns, batch_size=batch_size, use_pyarrow=True, ) as (meta, reader): for batch in reader: if max_nfi_features is not None: remaining = max_nfi_features - features_seen if remaining <= 0: break if batch.num_rows > remaining: batch = batch.slice(0, remaining) features_seen += batch.num_rows names = batch.schema.names geometry_column = _geometry_column(meta, names) category = np.asarray( batch.column(names.index(NFI_CATEGORY_COL)).to_numpy( zero_copy_only=False ), dtype=object, ) geometry = np.asarray( batch.column(names.index(geometry_column)).to_numpy( zero_copy_only=False ), dtype=object, ) valid = category == NFI_WOODLAND_VALUE if not valid.any(): continue woodland_type = _column_or_none(batch, names, NFI_TYPE_COL) area_ha = _column_or_none(batch, names, NFI_AREA_HA_COL) geometries = shapely.from_wkb(geometry[valid]) geometries = shapely.transform( geometries, to_wgs84.transform, interleaved=False, ) geometries_json = shapely.to_geojson(geometries) valid_indexes = np.flatnonzero(valid) for idx, geometry_json in zip(valid_indexes, geometries_json): area_sqm = ( _number_or_none(area_ha[idx] * 10000.0) if area_ha is not None else None ) properties = { "source": "nfi", "tow_id": "", "woodland_type": ( str(woodland_type[idx]) if woodland_type is not None else "" ), "area_sqm": area_sqm, "mean_height_m": None, "min_height_m": None, "max_height_m": None, "lidar_year": None, "source_layer": layer, } feature = { "type": "Feature", "geometry": json.loads(geometry_json), "properties": properties, } file.write(json.dumps(feature, separators=(",", ":")) + "\n") feature_count += 1 return feature_count def build_tree_overlay_tiles( tow_zip: Path, output_path: Path, extract_dir: Path, batch_size: int, layer_names: tuple[str, ...] | None, max_features_per_layer: int | None, min_zoom: int, max_zoom: int, force_extract: bool, use_vsizip: bool, nfi_zip: Path | None = None, nfi_extract_dir: Path = Path("property-data/nfi_woodland_england"), max_nfi_features: int | None = None, ) -> None: tippecanoe = _require_tippecanoe() dataset_path = _tow_dataset_path(tow_zip, extract_dir, force_extract, use_vsizip) output_path.parent.mkdir(parents=True, exist_ok=True) with tempfile.TemporaryDirectory(dir=local_tmp_dir()) as tmp: ndjson_path = Path(tmp) / "trees_outside_woodlands.geojsonseq" feature_count = _write_tree_geojsonseq( dataset_path, ndjson_path, batch_size, layer_names, max_features_per_layer, ) print(f"Writing {feature_count:,} TOW polygon features") if nfi_zip is not None and nfi_zip.exists(): nfi_path = _nfi_dataset_path( nfi_zip, nfi_extract_dir, force_extract, use_vsizip ) nfi_count = _append_nfi_geojsonseq( nfi_path, ndjson_path, batch_size, max_nfi_features, ) print(f"Writing {nfi_count:,} NFI woodland polygon features") elif nfi_zip is not None: print(f"NFI zip not found, skipping woodland union: {nfi_zip}") subprocess.run( [ tippecanoe, "--force", "--output", str(output_path), "--layer", "trees_outside_woodlands", "--minimum-zoom", str(min_zoom), "--maximum-zoom", str(max_zoom), "--coalesce-smallest-as-needed", "--extend-zooms-if-still-dropping", "--temporary-directory", tmp, str(ndjson_path), ], check=True, ) def main() -> None: parser = argparse.ArgumentParser(description=__doc__) parser.add_argument("--tow-zip", type=Path, required=True) parser.add_argument("--output", type=Path, required=True) parser.add_argument( "--extract-dir", type=Path, default=Path("property-data/fr_tow_v1_all"), help="Directory used to extract the FileGDB", ) parser.add_argument("--batch-size", type=int, default=50_000) parser.add_argument("--layer", action="append", dest="layers") parser.add_argument("--max-features-per-layer", type=int) parser.add_argument("--min-zoom", type=int, default=12) parser.add_argument("--max-zoom", type=int, default=17) parser.add_argument("--force-extract", action="store_true") parser.add_argument("--use-vsizip", action="store_true") parser.add_argument( "--nfi-zip", type=Path, default=None, help="Optional NFI woodland shapefile zip to union into the overlay", ) parser.add_argument( "--nfi-extract-dir", type=Path, default=Path("property-data/nfi_woodland_england"), help="Directory used to extract the NFI zip", ) parser.add_argument("--max-nfi-features", type=int) args = parser.parse_args() build_tree_overlay_tiles( tow_zip=args.tow_zip, output_path=args.output, extract_dir=args.extract_dir, batch_size=args.batch_size, layer_names=tuple(args.layers) if args.layers else None, max_features_per_layer=args.max_features_per_layer, min_zoom=args.min_zoom, max_zoom=args.max_zoom, force_extract=args.force_extract, use_vsizip=args.use_vsizip, nfi_zip=args.nfi_zip, nfi_extract_dir=args.nfi_extract_dir, max_nfi_features=args.max_nfi_features, ) if __name__ == "__main__": main()