"""Spatial-join GeoSure 5km hex grid risk data to postcode centroids. Reads six ESRI Shapefiles (one per hazard type), converts polygons from BNG to WGS84, and queries postcode centroids against them using an STRtree. Outputs a postcode-level parquet with six risk columns plus an overall max. Source: Ordnance Survey GeoSure (Open Data) """ import argparse from pathlib import Path import numpy as np import polars as pl import shapefile as shp from pyproj import Transformer from shapely import STRtree, points from shapely.geometry import shape as to_shapely _GEOSURE_RISKS = [ ("CollapsibleDeposits", "collapsible_deposits_risk"), ("CompressibleGround", "compressible_ground_risk"), ("Landslides", "landslide_risk"), ("RunningSand", "running_sand_risk"), ("ShrinkSwell", "shrink_swell_risk"), ("SolubleRocks", "soluble_rocks_risk"), ] def transform(geosure_dir: Path, arcgis_path: Path) -> pl.DataFrame: """Spatial-join GeoSure 5km hex grid risk data to postcode centroids.""" to_wgs84 = Transformer.from_crs("EPSG:27700", "EPSG:4326", always_xy=True) print("Loading postcode centroids for GeoSure join...") postcodes_df = pl.read_parquet(arcgis_path, columns=["pcds", "lat", "long"]) lats = postcodes_df["lat"].to_numpy() lons = postcodes_df["long"].to_numpy() pts = points(np.column_stack([lons, lats])) result = pl.DataFrame({"postcode": postcodes_df["pcds"]}) risk_cols = [] for risk_key, col_name in _GEOSURE_RISKS: shp_files = list(geosure_dir.rglob(f"*_GS_{risk_key}_*.shp")) if not shp_files: print(f" Warning: No shapefile found for {risk_key}") result = result.with_columns(pl.lit(None).cast(pl.UInt8).alias(col_name)) risk_cols.append(col_name) continue print(f" Reading {shp_files[0].name}...") reader = shp.Reader(str(shp_files[0])) hex_polys = [] hex_classes = [] for sr in reader.shapeRecords(): geo = sr.shape.__geo_interface__ new_coords = [] for ring in geo["coordinates"]: xs, ys = zip(*ring) t_lons, t_lats = to_wgs84.transform(list(xs), list(ys)) new_coords.append(list(zip(t_lons, t_lats))) geo["coordinates"] = new_coords hex_polys.append(to_shapely(geo)) hex_classes.append(int(sr.record["CLASS"])) classes_arr = np.array(hex_classes, dtype=np.uint8) print(f" Querying {len(pts)} postcodes against {len(hex_polys)} hexagons...") tree = STRtree(hex_polys) pt_idx, hex_idx = tree.query(pts, predicate="intersects") risk_values = np.zeros(len(pts), dtype=np.uint8) np.maximum.at(risk_values, pt_idx, classes_arr[hex_idx]) result = result.with_columns(pl.Series(col_name, risk_values)) risk_cols.append(col_name) # Overall environmental risk = max across all 6 result = result.with_columns( pl.max_horizontal(*risk_cols).alias("environmental_risk") ) # Convert 0 → null, 1/2/3 → Low/Moderate/Significant label_map = {1: "Low", 2: "Moderate", 3: "Significant"} for col in risk_cols + ["environmental_risk"]: result = result.with_columns( pl.col(col) .replace_strict(label_map, default=None, return_dtype=pl.Utf8) .alias(col) ) print(f" GeoSure join complete: {result.height} postcodes") return result def main() -> None: parser = argparse.ArgumentParser( description="Spatial-join GeoSure ground stability data to postcode centroids" ) parser.add_argument( "--geosure", type=Path, required=True, help="GeoSure shapefile directory", ) parser.add_argument( "--arcgis", type=Path, required=True, help="ArcGIS postcode data parquet (for lat/lon coordinates)", ) parser.add_argument( "--output", type=Path, required=True, help="Output parquet file path", ) args = parser.parse_args() result = transform(args.geosure, args.arcgis) args.output.parent.mkdir(parents=True, exist_ok=True) result.write_parquet(args.output, compression="zstd") size_mb = args.output.stat().st_size / (1024 * 1024) print(f"Wrote {args.output} ({size_mb:.1f} MB)") if __name__ == "__main__": main()