perfect-postcode/pipeline/transform/transform_geosure.py

131 lines
4.3 KiB
Python

"""Spatial-join GeoSure 5km hex grid risk data to postcode centroids.
Reads six ESRI Shapefiles (one per hazard type), converts polygons from
BNG to WGS84, and queries postcode centroids against them using an STRtree.
Outputs a postcode-level parquet with six risk columns plus an overall max.
Source: Ordnance Survey GeoSure (Open Data)
"""
import argparse
import numpy as np
import polars as pl
from pathlib import Path
_GEOSURE_RISKS = [
("CollapsibleDeposits", "collapsible_deposits_risk"),
("CompressibleGround", "compressible_ground_risk"),
("Landslides", "landslide_risk"),
("RunningSand", "running_sand_risk"),
("ShrinkSwell", "shrink_swell_risk"),
("SolubleRocks", "soluble_rocks_risk"),
]
def transform(geosure_dir: Path, arcgis_path: Path) -> pl.DataFrame:
"""Spatial-join GeoSure 5km hex grid risk data to postcode centroids."""
import shapefile as shp
from pyproj import Transformer
from shapely import STRtree, points
from shapely.geometry import shape as to_shapely
to_wgs84 = Transformer.from_crs("EPSG:27700", "EPSG:4326", always_xy=True)
print("Loading postcode centroids for GeoSure join...")
postcodes_df = pl.read_parquet(arcgis_path, columns=["pcds", "lat", "long"])
lats = postcodes_df["lat"].to_numpy()
lons = postcodes_df["long"].to_numpy()
pts = points(np.column_stack([lons, lats]))
result = pl.DataFrame({"postcode": postcodes_df["pcds"]})
risk_cols = []
for risk_key, col_name in _GEOSURE_RISKS:
shp_files = list(geosure_dir.rglob(f"*_GS_{risk_key}_*.shp"))
if not shp_files:
print(f" Warning: No shapefile found for {risk_key}")
result = result.with_columns(pl.lit(None).cast(pl.UInt8).alias(col_name))
risk_cols.append(col_name)
continue
print(f" Reading {shp_files[0].name}...")
reader = shp.Reader(str(shp_files[0]))
hex_polys = []
hex_classes = []
for sr in reader.shapeRecords():
geo = sr.shape.__geo_interface__
new_coords = []
for ring in geo["coordinates"]:
xs, ys = zip(*ring)
t_lons, t_lats = to_wgs84.transform(list(xs), list(ys))
new_coords.append(list(zip(t_lons, t_lats)))
geo["coordinates"] = new_coords
hex_polys.append(to_shapely(geo))
hex_classes.append(int(sr.record["CLASS"]))
classes_arr = np.array(hex_classes, dtype=np.uint8)
print(f" Querying {len(pts)} postcodes against {len(hex_polys)} hexagons...")
tree = STRtree(hex_polys)
pt_idx, hex_idx = tree.query(pts, predicate="intersects")
risk_values = np.zeros(len(pts), dtype=np.uint8)
np.maximum.at(risk_values, pt_idx, classes_arr[hex_idx])
result = result.with_columns(pl.Series(col_name, risk_values))
risk_cols.append(col_name)
# Overall environmental risk = max across all 6
result = result.with_columns(
pl.max_horizontal(*risk_cols).alias("environmental_risk")
)
# Convert 0 → null, 1/2/3 → Low/Moderate/Significant
label_map = {1: "Low", 2: "Moderate", 3: "Significant"}
for col in risk_cols + ["environmental_risk"]:
result = result.with_columns(
pl.col(col)
.replace_strict(label_map, default=None, return_dtype=pl.Utf8)
.alias(col)
)
print(f" GeoSure join complete: {result.height} postcodes")
return result
def main() -> None:
parser = argparse.ArgumentParser(
description="Spatial-join GeoSure ground stability data to postcode centroids"
)
parser.add_argument(
"--geosure",
type=Path,
required=True,
help="GeoSure shapefile directory",
)
parser.add_argument(
"--arcgis",
type=Path,
required=True,
help="ArcGIS postcode data parquet (for lat/lon coordinates)",
)
parser.add_argument(
"--output",
type=Path,
required=True,
help="Output parquet file path",
)
args = parser.parse_args()
result = transform(args.geosure, args.arcgis)
args.output.parent.mkdir(parents=True, exist_ok=True)
result.write_parquet(args.output, compression="zstd")
size_mb = args.output.stat().st_size / (1024 * 1024)
print(f"Wrote {args.output} ({size_mb:.1f} MB)")
if __name__ == "__main__":
main()