From c84af213e25ec3c122662058c845c10a8309076b Mon Sep 17 00:00:00 2001 From: Andras Schmelczer Date: Sun, 1 Feb 2026 13:09:58 +0000 Subject: [PATCH] Fix data loading --- pipeline/download/noise.py | 14 +++++--- pipeline/transform/merge.py | 66 +++++++++++++++++++++++++++++++------ 2 files changed, 65 insertions(+), 15 deletions(-) diff --git a/pipeline/download/noise.py b/pipeline/download/noise.py index ba91ff7..554adfe 100644 --- a/pipeline/download/noise.py +++ b/pipeline/download/noise.py @@ -190,7 +190,8 @@ def sample_noise_at_postcodes( easting: np.ndarray, northing: np.ndarray, label: str, -) -> np.ndarray: + col_name: str, +) -> pl.Series: """Sample noise values from merged tiles at given BNG coordinates.""" print(f"[{label}] Merging {len(tile_paths)} tiles...") datasets = [rasterio.open(p) for p in tile_paths] @@ -223,7 +224,10 @@ def sample_noise_at_postcodes( valid_count = int(np.sum(~np.isnan(noise_db))) print(f"[{label}] Sampled {valid_count:,} / {len(easting):,} postcodes with noise data") - return noise_db + + # Return as masked Series: use null (not NaN) so that Polars max_horizontal + # correctly ignores missing values instead of propagating NaN. + return pl.Series(col_name, noise_db).fill_nan(None) def main() -> None: @@ -264,11 +268,11 @@ def main() -> None: if not tile_paths: print(f"[{label}] WARNING: No tiles downloaded — column will be all null") - noise_db = np.full(len(lat), np.nan, dtype=np.float32) + series = pl.Series(col_name, [None] * len(lat), dtype=pl.Float32) else: - noise_db = sample_noise_at_postcodes(tile_paths, easting, northing, label) + series = sample_noise_at_postcodes(tile_paths, easting, northing, label, col_name) - result = result.with_columns(pl.Series(col_name, noise_db)) + result = result.with_columns(series) result.write_parquet(args.output, compression="zstd") size_mb = args.output.stat().st_size / (1024 * 1024) diff --git a/pipeline/transform/merge.py b/pipeline/transform/merge.py index b7aa516..69a93e2 100644 --- a/pipeline/transform/merge.py +++ b/pipeline/transform/merge.py @@ -2,6 +2,9 @@ import argparse import polars as pl from pathlib import Path +MIN_PRICE = 10_000 +MIN_FLOOR_AREA_M2 = 10 + def _build_wide( epc_pp_path: Path, @@ -27,11 +30,17 @@ def _build_wide( ) wide = wide.join(arcgis, on="postcode", how="inner") - journey_times = pl.scan_parquet(journey_times_path).select( - "postcode", - "public_transport_easy_minutes", - "public_transport_quick_minutes", - "cycling_minutes", + journey_times = ( + pl.scan_parquet(journey_times_path) + .select( + "postcode", + "public_transport_easy_minutes", + "public_transport_quick_minutes", + "cycling_minutes", + ) + .sort("public_transport_quick_minutes", nulls_last=True) + .group_by("postcode") + .first() ) wide = wide.join(journey_times, on="postcode", how="left") @@ -49,15 +58,39 @@ def _build_wide( crime = pl.scan_parquet(crime_path) wide = wide.join(crime, left_on="lsoa21", right_on="LSOA code", how="left") + wide = wide.with_columns( + pl.sum_horizontal( + "Violence and sexual offences (avg/yr)", + "Robbery (avg/yr)", + "Burglary (avg/yr)", + "Possession of weapons (avg/yr)", + ).alias("serious_crime_avg_yr"), + pl.sum_horizontal( + "Anti-social behaviour (avg/yr)", + "Criminal damage and arson (avg/yr)", + "Shoplifting (avg/yr)", + "Bicycle theft (avg/yr)", + "Theft from the person (avg/yr)", + "Other theft (avg/yr)", + "Vehicle crime (avg/yr)", + "Public order (avg/yr)", + "Drugs (avg/yr)", + "Other crime (avg/yr)", + ).alias("minor_crime_avg_yr"), + ) + poi_counts = pl.scan_parquet(poi_proximity_path) wide = wide.join(poi_counts, on="postcode", how="left") + noise_cols = ["road_noise_lden_db", "rail_noise_lden_db", "airport_noise_lden_db"] noise = ( pl.scan_parquet(noise_path) .with_columns( - pl.max_horizontal( - "road_noise_lden_db", "rail_noise_lden_db", "airport_noise_lden_db" - ).alias("noise_lden_db"), + # NaN → null so max_horizontal ignores missing instead of propagating NaN + *[pl.col(c).fill_nan(None) for c in noise_cols], + ) + .with_columns( + pl.max_horizontal(*noise_cols).fill_null(0).alias("noise_lden_db"), ) .select("postcode", "noise_lden_db") ) @@ -102,7 +135,18 @@ def _build_wide( ) wide = ( - wide.filter(pl.col("total_floor_area") > 0) + wide.filter(pl.col("total_floor_area") > MIN_FLOOR_AREA_M2) + .filter(pl.col("latest_price") >= MIN_PRICE) + .with_columns( + pl.when(pl.col("duration") == "U") + .then(None) + .otherwise(pl.col("duration")) + .alias("duration"), + pl.when(pl.col("current_energy_rating") == "INVALID!") + .then(None) + .otherwise(pl.col("current_energy_rating")) + .alias("current_energy_rating"), + ) .with_columns( (pl.col("latest_price") / pl.col("total_floor_area")) .round(0) @@ -146,11 +190,13 @@ def _build_wide( "parks_2km": "Parks within 2km", "public_transport_2km": "Public transport within 2km", "latest_price": "Last known price", - "number_habitable_rooms": "Rooms (including bedrooms & bathrooms)", + "number_habitable_rooms": "Number of bedrooms & living rooms", "noise_lden_db": "Noise (dB)", "good_primary_5km": "Good+ primary schools within 5km", "good_secondary_5km": "Good+ secondary schools within 5km", "max_download_speed": "Max available download speed (Mbps)", + "serious_crime_avg_yr": "Serious crime (avg/yr)", + "minor_crime_avg_yr": "Minor crime (avg/yr)", } ) )