"""Spatial-join GeoSure 5km hex grid risk data to postcode centroids. Reads six ESRI Shapefiles (one per hazard type), converts polygons from BNG to WGS84, and queries postcode centroids against them using an STRtree. Outputs a postcode-level parquet with six risk columns plus an overall max. Source: Ordnance Survey GeoSure (Open Data) """ import argparse import numpy as np import polars as pl from pathlib import Path _GEOSURE_RISKS = [ ("CollapsibleDeposits", "collapsible_deposits_risk"), ("CompressibleGround", "compressible_ground_risk"), ("Landslides", "landslide_risk"), ("RunningSand", "running_sand_risk"), ("ShrinkSwell", "shrink_swell_risk"), ("SolubleRocks", "soluble_rocks_risk"), ] def transform(geosure_dir: Path, arcgis_path: Path) -> pl.DataFrame: """Spatial-join GeoSure 5km hex grid risk data to postcode centroids.""" import shapefile as shp from pyproj import Transformer from shapely import STRtree, points from shapely.geometry import shape as to_shapely to_wgs84 = Transformer.from_crs("EPSG:27700", "EPSG:4326", always_xy=True) print("Loading postcode centroids for GeoSure join...") postcodes_df = pl.read_parquet(arcgis_path, columns=["pcds", "lat", "long"]) lats = postcodes_df["lat"].to_numpy() lons = postcodes_df["long"].to_numpy() pts = points(np.column_stack([lons, lats])) result = pl.DataFrame({"postcode": postcodes_df["pcds"]}) risk_cols = [] for risk_key, col_name in _GEOSURE_RISKS: shp_files = list(geosure_dir.rglob(f"*_GS_{risk_key}_*.shp")) if not shp_files: print(f" Warning: No shapefile found for {risk_key}") result = result.with_columns(pl.lit(None).cast(pl.UInt8).alias(col_name)) risk_cols.append(col_name) continue print(f" Reading {shp_files[0].name}...") reader = shp.Reader(str(shp_files[0])) hex_polys = [] hex_classes = [] for sr in reader.shapeRecords(): geo = sr.shape.__geo_interface__ new_coords = [] for ring in geo["coordinates"]: xs, ys = zip(*ring) t_lons, t_lats = to_wgs84.transform(list(xs), list(ys)) new_coords.append(list(zip(t_lons, t_lats))) geo["coordinates"] = new_coords hex_polys.append(to_shapely(geo)) hex_classes.append(int(sr.record["CLASS"])) classes_arr = np.array(hex_classes, dtype=np.uint8) print(f" Querying {len(pts)} postcodes against {len(hex_polys)} hexagons...") tree = STRtree(hex_polys) pt_idx, hex_idx = tree.query(pts, predicate="intersects") risk_values = np.zeros(len(pts), dtype=np.uint8) np.maximum.at(risk_values, pt_idx, classes_arr[hex_idx]) result = result.with_columns(pl.Series(col_name, risk_values)) risk_cols.append(col_name) # Overall environmental risk = max across all 6 result = result.with_columns( pl.max_horizontal(*risk_cols).alias("environmental_risk") ) # Convert 0 → null, 1/2/3 → Low/Moderate/Significant label_map = {1: "Low", 2: "Moderate", 3: "Significant"} for col in risk_cols + ["environmental_risk"]: result = result.with_columns( pl.col(col) .replace_strict(label_map, default=None, return_dtype=pl.Utf8) .alias(col) ) print(f" GeoSure join complete: {result.height} postcodes") return result def main() -> None: parser = argparse.ArgumentParser( description="Spatial-join GeoSure ground stability data to postcode centroids" ) parser.add_argument( "--geosure", type=Path, required=True, help="GeoSure shapefile directory", ) parser.add_argument( "--arcgis", type=Path, required=True, help="ArcGIS postcode data parquet (for lat/lon coordinates)", ) parser.add_argument( "--output", type=Path, required=True, help="Output parquet file path", ) args = parser.parse_args() result = transform(args.geosure, args.arcgis) args.output.parent.mkdir(parents=True, exist_ok=True) result.write_parquet(args.output, compression="zstd") size_mb = args.output.stat().st_size / (1024 * 1024) print(f"Wrote {args.output} ({size_mb:.1f} MB)") if __name__ == "__main__": main()