From e15ab7dfef6230f498e8605521d1efdf80d13266 Mon Sep 17 00:00:00 2001 From: Andras Schmelczer Date: Sun, 1 Feb 2026 11:06:50 +0000 Subject: [PATCH] Add noise data --- pipeline/transform/merge.py | 106 ++++++++++++++++++++---------------- 1 file changed, 59 insertions(+), 47 deletions(-) diff --git a/pipeline/transform/merge.py b/pipeline/transform/merge.py index ffbb046..b7aa516 100644 --- a/pipeline/transform/merge.py +++ b/pipeline/transform/merge.py @@ -10,17 +10,14 @@ def _build_wide( poi_proximity_path: Path, journey_times_path: Path, ethnicity_path: Path, - crime_path: Path , + crime_path: Path, noise_path: Path, school_proximity_path: Path, broadband_path: Path, ) -> pl.DataFrame: """Build the wide dataframe by joining epc_pp with all auxiliary data.""" - print("Scanning epc_pp...") wide = pl.scan_parquet(epc_pp_path) - # GPS coordinates + LSOA from ArcGIS - print("Joining GPS coordinates...") arcgis = pl.scan_parquet(arcgis_path).select( pl.col("pcds").alias("postcode"), "lat", @@ -30,7 +27,6 @@ def _build_wide( ) wide = wide.join(arcgis, on="postcode", how="inner") - print("Joining journey times...") journey_times = pl.scan_parquet(journey_times_path).select( "postcode", "public_transport_easy_minutes", @@ -39,12 +35,9 @@ def _build_wide( ) wide = wide.join(journey_times, on="postcode", how="left") - print("Joining IoD scores...") iod = pl.scan_parquet(iod_path) wide = wide.join(iod, left_on="lsoa21", right_on="LSOA code (2021)", how="left") - # Ethnicity by local authority - print("Joining ethnicity data...") ethnicity = pl.scan_parquet(ethnicity_path) wide = wide.join( ethnicity, @@ -53,50 +46,50 @@ def _build_wide( how="left", ) - # Crime stats by LSOA - print("Joining crime data...") crime = pl.scan_parquet(crime_path) wide = wide.join(crime, left_on="lsoa21", right_on="LSOA code", how="left") - print("Joining POI proximity counts...") poi_counts = pl.scan_parquet(poi_proximity_path) wide = wide.join(poi_counts, on="postcode", how="left") - # noise = pl.scan_parquet(noise_path).select( - # "postcode", "road_noise_lden_db", "rail_noise_lden_db", "airport_noise_lden_db" - # ) - # wide = wide.join(noise, on="postcode", how="left") + noise = ( + pl.scan_parquet(noise_path) + .with_columns( + pl.max_horizontal( + "road_noise_lden_db", "rail_noise_lden_db", "airport_noise_lden_db" + ).alias("noise_lden_db"), + ) + .select("postcode", "noise_lden_db") + ) + wide = wide.join(noise, on="postcode", how="left") - print("Joining school proximity counts...") school_proximity = pl.scan_parquet(school_proximity_path) wide = wide.join(school_proximity, on="postcode", how="left") # Broadband: derive max available download speed tier per postcode from # Ofcom availability percentages. Tiers: Gigabit ≥1000, UFBB ≥300, # UFBB(100) ≥100, SFBB ≥30 Mbps. - print("Joining broadband availability...") - broadband = pl.scan_parquet(broadband_path).select( - pl.col("postcode_space").alias("bb_postcode"), - pl.when(pl.col("Gigabit availability (% premises)") > 0).then(1000) - .when(pl.col("UFBB availability (% premises)") > 0).then(300) - .when(pl.col("UFBB (100Mbit/s) availability (% premises)") > 0).then(100) - .when(pl.col("SFBB availability (% premises)") > 0).then(30) - .otherwise(10) - .cast(pl.UInt16) - .alias("max_download_speed"), + broadband = ( + pl.scan_parquet(broadband_path) + .select( + pl.col("postcode_space").alias("bb_postcode"), + pl.when(pl.col("Gigabit availability (% premises)") > 0) + .then(1000) + .when(pl.col("UFBB availability (% premises)") > 0) + .then(300) + .when(pl.col("UFBB (100Mbit/s) availability (% premises)") > 0) + .then(100) + .when(pl.col("SFBB availability (% premises)") > 0) + .then(30) + .otherwise(10) + .cast(pl.UInt16) + .alias("max_download_speed"), + ) + .group_by("bb_postcode") + .agg(pl.col("max_download_speed").max()) ) wide = wide.join(broadband, left_on="postcode", right_on="bb_postcode", how="left") - # Convert construction_age_band to numeric year - wide = wide.with_columns( - pl.col("construction_age_band") - .str.replace("England and Wales: ", "") - .str.replace(" onwards", "") - .str.extract(r"(\d{4})", 1) - .cast(pl.UInt16, strict=False) - .alias("construction_age_band"), - ) - wide = wide.with_columns( pl.when(pl.col("pp_property_type") == pl.col("built_form")) .then(pl.col("pp_property_type")) @@ -109,7 +102,8 @@ def _build_wide( ) wide = ( - wide.filter(pl.col("total_floor_area") > 0).with_columns( + wide.filter(pl.col("total_floor_area") > 0) + .with_columns( (pl.col("latest_price") / pl.col("total_floor_area")) .round(0) .cast(pl.Int32) @@ -137,6 +131,7 @@ def _build_wide( .rename( { "construction_age_band": "Approximate construction age", + "is_construction_date_approximate": "Is construction date approximate", "pp_address": "Address per Property Register", "epc_address": "Address per EPC", "postcode": "Postcode", @@ -152,9 +147,7 @@ def _build_wide( "public_transport_2km": "Public transport within 2km", "latest_price": "Last known price", "number_habitable_rooms": "Rooms (including bedrooms & bathrooms)", - # "road_noise_lden_db": "Road noise Lden (dB)", - # "rail_noise_lden_db": "Rail noise Lden (dB)", - # "airport_noise_lden_db": "Airport noise Lden (dB)", + "noise_lden_db": "Noise (dB)", "good_primary_5km": "Good+ primary schools within 5km", "good_secondary_5km": "Good+ secondary schools within 5km", "max_download_speed": "Max available download speed (Mbps)", @@ -165,6 +158,7 @@ def _build_wide( print("Collecting with streaming engine...") return wide.collect(engine="streaming") + def main(): parser = argparse.ArgumentParser( description="Build wide property dataframe with all joins" @@ -176,7 +170,10 @@ def main(): "--arcgis", type=Path, required=True, help="ArcGIS postcode data parquet file" ) parser.add_argument( - "--iod", type=Path, required=True, help="Index of Deprivation parquet file (optional)" + "--iod", + type=Path, + required=True, + help="Index of Deprivation parquet file (optional)", ) parser.add_argument( "--poi-proximity", @@ -184,22 +181,37 @@ def main(): help="POI proximity counts parquet file (optional)", ) parser.add_argument( - "--journey-times", required=True, type=Path, help="Journey times parquet file (optional)" + "--journey-times", + required=True, + type=Path, + help="Journey times parquet file (optional)", ) parser.add_argument( - "--ethnicity", type=Path, required=True, help="Ethnicity by local authority parquet file (optional)" + "--ethnicity", + type=Path, + required=True, + help="Ethnicity by local authority parquet file (optional)", ) parser.add_argument( - "--crime", type=Path, required=True, help="Crime by LSOA parquet file (optional)" + "--crime", + type=Path, + required=True, + help="Crime by LSOA parquet file (optional)", ) parser.add_argument( - "--noise", type=Path, required=True, help="Road noise by postcode parquet file" + "--noise", type=Path, required=True, help="Road noise by postcode parquet file" ) parser.add_argument( - "--school-proximity", type=Path, required=True, help="School proximity counts parquet file" + "--school-proximity", + type=Path, + required=True, + help="School proximity counts parquet file", ) parser.add_argument( - "--broadband", type=Path, required=True, help="Broadband performance by output area parquet file" + "--broadband", + type=Path, + required=True, + help="Broadband performance by output area parquet file", ) parser.add_argument( "--output", type=Path, required=True, help="Output parquet file path"