import argparse import polars as pl from pathlib import Path def _build_wide( epc_pp_path: Path, arcgis_path: Path, iod_path: Path, poi_proximity_path: Path, journey_times_path: Path, ethnicity_path: Path, crime_path: Path , noise_path: Path, school_proximity_path: Path, broadband_path: Path, ) -> pl.DataFrame: """Build the wide dataframe by joining epc_pp with all auxiliary data.""" print("Scanning epc_pp...") wide = pl.scan_parquet(epc_pp_path) # GPS coordinates + LSOA from ArcGIS print("Joining GPS coordinates...") arcgis = pl.scan_parquet(arcgis_path).select( pl.col("pcds").alias("postcode"), "lat", pl.col("long").alias("lon"), "lsoa21", "oa21", ) wide = wide.join(arcgis, on="postcode", how="inner") print("Joining journey times...") journey_times = pl.scan_parquet(journey_times_path).select( "postcode", "public_transport_easy_minutes", "public_transport_quick_minutes", "cycling_minutes", ) wide = wide.join(journey_times, on="postcode", how="left") print("Joining IoD scores...") iod = pl.scan_parquet(iod_path) wide = wide.join(iod, left_on="lsoa21", right_on="LSOA code (2021)", how="left") # Ethnicity by local authority print("Joining ethnicity data...") ethnicity = pl.scan_parquet(ethnicity_path) wide = wide.join( ethnicity, left_on="Local Authority District code (2024)", right_on="Geography_code", how="left", ) # Crime stats by LSOA print("Joining crime data...") crime = pl.scan_parquet(crime_path) wide = wide.join(crime, left_on="lsoa21", right_on="LSOA code", how="left") print("Joining POI proximity counts...") poi_counts = pl.scan_parquet(poi_proximity_path) wide = wide.join(poi_counts, on="postcode", how="left") # noise = pl.scan_parquet(noise_path).select( # "postcode", "road_noise_lden_db", "rail_noise_lden_db", "airport_noise_lden_db" # ) # wide = wide.join(noise, on="postcode", how="left") print("Joining school proximity counts...") school_proximity = pl.scan_parquet(school_proximity_path) wide = wide.join(school_proximity, on="postcode", how="left") # Broadband: derive max available download speed tier per postcode from # Ofcom availability percentages. Tiers: Gigabit ≥1000, UFBB ≥300, # UFBB(100) ≥100, SFBB ≥30 Mbps. print("Joining broadband availability...") broadband = pl.scan_parquet(broadband_path).select( pl.col("postcode_space").alias("bb_postcode"), pl.when(pl.col("Gigabit availability (% premises)") > 0).then(1000) .when(pl.col("UFBB availability (% premises)") > 0).then(300) .when(pl.col("UFBB (100Mbit/s) availability (% premises)") > 0).then(100) .when(pl.col("SFBB availability (% premises)") > 0).then(30) .otherwise(10) .cast(pl.UInt16) .alias("max_download_speed"), ) wide = wide.join(broadband, left_on="postcode", right_on="bb_postcode", how="left") # Convert construction_age_band to numeric year wide = wide.with_columns( pl.col("construction_age_band") .str.replace("England and Wales: ", "") .str.replace(" onwards", "") .str.extract(r"(\d{4})", 1) .cast(pl.UInt16, strict=False) .alias("construction_age_band"), ) wide = wide.with_columns( pl.when(pl.col("pp_property_type") == pl.col("built_form")) .then(pl.col("pp_property_type")) .otherwise( pl.concat_str( [pl.col("pp_property_type"), pl.lit("/"), pl.col("built_form")] ) ) .alias("property_type_built_form") ) wide = ( wide.filter(pl.col("total_floor_area") > 0).with_columns( (pl.col("latest_price") / pl.col("total_floor_area")) .round(0) .cast(pl.Int32) .alias("Price per sqm"), ) .drop( "date_of_transfer", "inspection_date", "floor_height", "LSOA name (2021)", "Local Authority District code (2024)", "Local Authority District name (2024)", "Wider Barriers Sub-domain Score", "Geographical Barriers Sub-domain Score", "Adult Skills Sub-domain Score", "Children and Young People Sub-domain Score", "Income Deprivation Affecting Older People (IDAOPI) Score (rate)", "Income Deprivation Affecting Children Index (IDACI) Score (rate)", "Barriers to Housing and Services Score", "lsoa21", "oa21", "pp_property_type", "built_form", ) .rename( { "construction_age_band": "Approximate construction age", "pp_address": "Address per Property Register", "epc_address": "Address per EPC", "postcode": "Postcode", "duration": "Leashold/Freehold", "current_energy_rating": "Current energy rating", "potential_energy_rating": "Potential energy rating", "total_floor_area": "Total floor area (sqm)", "epc_property_type": "Property type", "property_type_built_form": "Property type/built form", "restaurants_2km": "Restaurants within 2km", "groceries_2km": "Groceries within 2km", "parks_2km": "Parks within 2km", "public_transport_2km": "Public transport within 2km", "latest_price": "Last known price", "number_habitable_rooms": "Rooms (including bedrooms & bathrooms)", # "road_noise_lden_db": "Road noise Lden (dB)", # "rail_noise_lden_db": "Rail noise Lden (dB)", # "airport_noise_lden_db": "Airport noise Lden (dB)", "good_primary_5km": "Good+ primary schools within 5km", "good_secondary_5km": "Good+ secondary schools within 5km", "max_download_speed": "Max available download speed (Mbps)", } ) ) print("Collecting with streaming engine...") return wide.collect(engine="streaming") def main(): parser = argparse.ArgumentParser( description="Build wide property dataframe with all joins" ) parser.add_argument( "--epc-pp", type=Path, required=True, help="EPC-Price Paid joined parquet file" ) parser.add_argument( "--arcgis", type=Path, required=True, help="ArcGIS postcode data parquet file" ) parser.add_argument( "--iod", type=Path, required=True, help="Index of Deprivation parquet file (optional)" ) parser.add_argument( "--poi-proximity", type=Path, help="POI proximity counts parquet file (optional)", ) parser.add_argument( "--journey-times", required=True, type=Path, help="Journey times parquet file (optional)" ) parser.add_argument( "--ethnicity", type=Path, required=True, help="Ethnicity by local authority parquet file (optional)" ) parser.add_argument( "--crime", type=Path, required=True, help="Crime by LSOA parquet file (optional)" ) parser.add_argument( "--noise", type=Path, required=True, help="Road noise by postcode parquet file" ) parser.add_argument( "--school-proximity", type=Path, required=True, help="School proximity counts parquet file" ) parser.add_argument( "--broadband", type=Path, required=True, help="Broadband performance by output area parquet file" ) parser.add_argument( "--output", type=Path, required=True, help="Output parquet file path" ) args = parser.parse_args() wide = _build_wide( epc_pp_path=args.epc_pp, arcgis_path=args.arcgis, iod_path=args.iod, poi_proximity_path=args.poi_proximity, journey_times_path=args.journey_times, ethnicity_path=args.ethnicity, crime_path=args.crime, noise_path=args.noise, school_proximity_path=args.school_proximity, broadband_path=args.broadband, ) print(f"Columns: {wide.columns}") print(f"Rows: {wide.height}") wide.write_parquet(args.output) size_mb = args.output.stat().st_size / (1024 * 1024) print(f"Wrote {args.output} ({size_mb:.1f} MB)") if __name__ == "__main__": main()