diff --git a/Taskfile.data.yml b/Taskfile.data.yml index a7b340b..adcceef 100644 --- a/Taskfile.data.yml +++ b/Taskfile.data.yml @@ -13,7 +13,7 @@ vars: WIDE_OUTPUT: "{{.DATA_DIR}}/wide.parquet" EPC: "{{.DATA_DIR}}/certificates.csv" JOURNEY_TIMES_BANK: "{{.DATA_DIR}}/journey_times_bank.parquet" - JOURNEY_TIMES_FITZROVIA: "{{.DATA_DIR}}/journey_times_fitzrovia_checkpoint.parquet" + JOURNEY_TIMES_FITZROVIA: "{{.DATA_DIR}}/journey_times_fitzrovia.parquet" ETHNICITY_OUTPUT: "{{.DATA_DIR}}/ethnicity_by_la.parquet" CRIME_DIR: "{{.DATA_DIR}}/crime" CRIME_OUTPUT: "{{.DATA_DIR}}/crime_by_lsoa.parquet" @@ -23,6 +23,8 @@ vars: BROADBAND_OUTPUT: "{{.DATA_DIR}}/broadband.parquet" SCHOOL_PROXIMITY_OUTPUT: "{{.DATA_DIR}}/school_proximity.parquet" POSTCODES_OUTPUT: "{{.DATA_DIR}}/postcodes" + GEOSURE_OUTPUT: "{{.DATA_DIR}}/geosure" + GEOSURE_PARQUET: "{{.DATA_DIR}}/geosure.parquet" tasks: download:tiles: @@ -141,6 +143,13 @@ tasks: cmds: - uv run python -m pipeline.download.postcodes --output {{.POSTCODES_OUTPUT}} + download:geosure: + desc: Download OS GeoSure ground stability data (5km hex grid) + status: + - test -d {{.GEOSURE_OUTPUT}} + cmds: + - uv run python -m pipeline.download.geosure --output {{.GEOSURE_OUTPUT}} + download:noise: desc: Download Defra noise data (road, rail, airport) sampled at postcode centroids deps: @@ -197,6 +206,16 @@ tasks: cmds: - uv run python -m pipeline.transform.school_proximity --ofsted {{.OFSTED_OUTPUT}} --arcgis {{.ARCGIS_OUTPUT}} --output {{.SCHOOL_PROXIMITY_OUTPUT}} + transform:geosure: + desc: Spatial-join GeoSure ground stability data to postcode centroids + deps: + - download:geosure + - download:arcgis + status: + - test -f {{.GEOSURE_PARQUET}} + cmds: + - uv run python -m pipeline.transform.transform_geosure --geosure {{.GEOSURE_OUTPUT}} --arcgis {{.ARCGIS_OUTPUT}} --output {{.GEOSURE_PARQUET}} + download:journey-times: desc: "Fetch TfL journey times: task download:journey-times" deps: @@ -218,6 +237,7 @@ tasks: # - transform:crime # - transform:poi-proximity # - transform:school-proximity + # - transform:geosure # - prompt:journey-times status: - test -f {{.WIDE_OUTPUT}} @@ -235,4 +255,5 @@ tasks: --noise {{.NOISE_OUTPUT}} --school-proximity {{.SCHOOL_PROXIMITY_OUTPUT}} --broadband {{.BROADBAND_OUTPUT}} + --geosure {{.GEOSURE_PARQUET}} --output {{.WIDE_OUTPUT}} diff --git a/pipeline/download/geosure.py b/pipeline/download/geosure.py new file mode 100644 index 0000000..88872ef --- /dev/null +++ b/pipeline/download/geosure.py @@ -0,0 +1,44 @@ +"""Download OS GeoSure ground stability data (5km hex grid). + +Downloads the GB-Hex-5km-GeoSure dataset from Ordnance Survey as an ESRI +Shapefile and extracts it. + +Source: https://osdatahub.os.uk/downloads/open/GeoSure +License: Open Government Licence v3.0 +""" + +import argparse +import tempfile +from pathlib import Path + +from pipeline.utils import download, extract_zip + +URL = "https://api.os.uk/downloads/v1/products/GB-Hex-5km-GeoSure/downloads?area=GB&format=ESRI%C2%AE+Shapefile&redirect" + + +def main() -> None: + parser = argparse.ArgumentParser( + description="Download OS GeoSure ground stability data" + ) + parser.add_argument( + "--output", + type=Path, + required=True, + help="Output directory for extracted shapefile", + ) + args = parser.parse_args() + + with tempfile.TemporaryDirectory() as cache_dir: + zip_path = Path(cache_dir) / "geosure.zip" + + download(URL, zip_path, timeout=300) + extract_zip(zip_path, args.output) + + shp_files = list(args.output.rglob("*.shp")) + print(f"Extracted {len(shp_files)} shapefiles to {args.output}") + for f in shp_files: + print(f" {f.relative_to(args.output)}") + + +if __name__ == "__main__": + main() diff --git a/pipeline/transform/merge.py b/pipeline/transform/merge.py index 5ef6837..6444153 100644 --- a/pipeline/transform/merge.py +++ b/pipeline/transform/merge.py @@ -41,6 +41,7 @@ def _build_wide( noise_path: Path, school_proximity_path: Path, broadband_path: Path, + geosure_path: Path, ) -> pl.DataFrame: """Build the wide dataframe by joining epc_pp with all auxiliary data.""" wide = pl.scan_parquet(epc_pp_path) @@ -136,6 +137,9 @@ def _build_wide( ) wide = wide.join(broadband, left_on="postcode", right_on="bb_postcode", how="left") + geosure = pl.scan_parquet(geosure_path) + wide = wide.join(geosure, on="postcode", how="left") + # Use built_form (Terraced, Semi-detached) when available, otherwise epc_property_type wide = wide.with_columns( pl.when(pl.col("pp_property_type").is_in(["Terraced", "Semi-Detached"])) @@ -208,6 +212,14 @@ def _build_wide( "max_download_speed": "Max available download speed (Mbps)", "serious_crime_avg_yr": "Serious crime (avg/yr)", "minor_crime_avg_yr": "Minor crime (avg/yr)", + "transaction_year": "Transaction year", + "environmental_risk": "Environmental risk", + "collapsible_deposits_risk": "Collapsible deposits risk", + "compressible_ground_risk": "Compressible ground risk", + "landslide_risk": "Landslide risk", + "running_sand_risk": "Running sand risk", + "shrink_swell_risk": "Shrink-swell risk", + "soluble_rocks_risk": "Soluble rocks risk", } ) ) @@ -276,6 +288,12 @@ def main(): required=True, help="Broadband performance by output area parquet file", ) + parser.add_argument( + "--geosure", + type=Path, + required=True, + help="GeoSure ground stability parquet file", + ) parser.add_argument( "--output", type=Path, required=True, help="Output parquet file path" ) @@ -293,6 +311,7 @@ def main(): noise_path=args.noise, school_proximity_path=args.school_proximity, broadband_path=args.broadband, + geosure_path=args.geosure, ) print(f"Columns: {wide.columns}") diff --git a/pipeline/transform/transform_geosure.py b/pipeline/transform/transform_geosure.py new file mode 100644 index 0000000..efef41e --- /dev/null +++ b/pipeline/transform/transform_geosure.py @@ -0,0 +1,131 @@ +"""Spatial-join GeoSure 5km hex grid risk data to postcode centroids. + +Reads six ESRI Shapefiles (one per hazard type), converts polygons from +BNG to WGS84, and queries postcode centroids against them using an STRtree. +Outputs a postcode-level parquet with six risk columns plus an overall max. + +Source: Ordnance Survey GeoSure (Open Data) +""" + +import argparse + +import numpy as np +import polars as pl +from pathlib import Path + +_GEOSURE_RISKS = [ + ("CollapsibleDeposits", "collapsible_deposits_risk"), + ("CompressibleGround", "compressible_ground_risk"), + ("Landslides", "landslide_risk"), + ("RunningSand", "running_sand_risk"), + ("ShrinkSwell", "shrink_swell_risk"), + ("SolubleRocks", "soluble_rocks_risk"), +] + + +def transform(geosure_dir: Path, arcgis_path: Path) -> pl.DataFrame: + """Spatial-join GeoSure 5km hex grid risk data to postcode centroids.""" + import shapefile as shp + from pyproj import Transformer + from shapely import STRtree, points + from shapely.geometry import shape as to_shapely + + to_wgs84 = Transformer.from_crs("EPSG:27700", "EPSG:4326", always_xy=True) + + print("Loading postcode centroids for GeoSure join...") + postcodes_df = pl.read_parquet(arcgis_path, columns=["pcds", "lat", "long"]) + lats = postcodes_df["lat"].to_numpy() + lons = postcodes_df["long"].to_numpy() + pts = points(np.column_stack([lons, lats])) + + result = pl.DataFrame({"postcode": postcodes_df["pcds"]}) + risk_cols = [] + + for risk_key, col_name in _GEOSURE_RISKS: + shp_files = list(geosure_dir.rglob(f"*_GS_{risk_key}_*.shp")) + if not shp_files: + print(f" Warning: No shapefile found for {risk_key}") + result = result.with_columns(pl.lit(None).cast(pl.UInt8).alias(col_name)) + risk_cols.append(col_name) + continue + + print(f" Reading {shp_files[0].name}...") + reader = shp.Reader(str(shp_files[0])) + + hex_polys = [] + hex_classes = [] + for sr in reader.shapeRecords(): + geo = sr.shape.__geo_interface__ + new_coords = [] + for ring in geo["coordinates"]: + xs, ys = zip(*ring) + t_lons, t_lats = to_wgs84.transform(list(xs), list(ys)) + new_coords.append(list(zip(t_lons, t_lats))) + geo["coordinates"] = new_coords + hex_polys.append(to_shapely(geo)) + hex_classes.append(int(sr.record["CLASS"])) + + classes_arr = np.array(hex_classes, dtype=np.uint8) + + print(f" Querying {len(pts)} postcodes against {len(hex_polys)} hexagons...") + tree = STRtree(hex_polys) + pt_idx, hex_idx = tree.query(pts, predicate="intersects") + + risk_values = np.zeros(len(pts), dtype=np.uint8) + np.maximum.at(risk_values, pt_idx, classes_arr[hex_idx]) + + result = result.with_columns(pl.Series(col_name, risk_values)) + risk_cols.append(col_name) + + # Overall environmental risk = max across all 6 + result = result.with_columns( + pl.max_horizontal(*risk_cols).alias("environmental_risk") + ) + + # Convert 0 → null, 1/2/3 → Low/Moderate/Significant + label_map = {1: "Low", 2: "Moderate", 3: "Significant"} + for col in risk_cols + ["environmental_risk"]: + result = result.with_columns( + pl.col(col) + .replace_strict(label_map, default=None, return_dtype=pl.Utf8) + .alias(col) + ) + + print(f" GeoSure join complete: {result.height} postcodes") + return result + + +def main() -> None: + parser = argparse.ArgumentParser( + description="Spatial-join GeoSure ground stability data to postcode centroids" + ) + parser.add_argument( + "--geosure", + type=Path, + required=True, + help="GeoSure shapefile directory", + ) + parser.add_argument( + "--arcgis", + type=Path, + required=True, + help="ArcGIS postcode data parquet (for lat/lon coordinates)", + ) + parser.add_argument( + "--output", + type=Path, + required=True, + help="Output parquet file path", + ) + args = parser.parse_args() + + result = transform(args.geosure, args.arcgis) + + args.output.parent.mkdir(parents=True, exist_ok=True) + result.write_parquet(args.output, compression="zstd") + size_mb = args.output.stat().st_size / (1024 * 1024) + print(f"Wrote {args.output} ({size_mb:.1f} MB)") + + +if __name__ == "__main__": + main()