import argparse import polars as pl from pathlib import Path MIN_PRICE = 10_000 MIN_FLOOR_AREA_M2 = 10 def _join_journey_times( wide: pl.LazyFrame, journey_times_path: Path, destination_name: str, ) -> pl.LazyFrame: """Join journey times for a single destination, renaming columns appropriately.""" journey_times = ( pl.scan_parquet(journey_times_path) .select( "postcode", pl.col("public_transport_quick_minutes").alias( f"Public transport to {destination_name} (mins)" ), pl.col("cycling_minutes").alias(f"Cycling to {destination_name} (mins)"), ) .sort(f"Public transport to {destination_name} (mins)", nulls_last=True) .group_by("postcode") .first() ) return wide.join(journey_times, on="postcode", how="left") def _build_wide( epc_pp_path: Path, arcgis_path: Path, iod_path: Path, poi_proximity_path: Path, journey_times_bank_path: Path, journey_times_fitzrovia_path: Path, ethnicity_path: Path, crime_path: Path, noise_path: Path, school_proximity_path: Path, broadband_path: Path, ) -> pl.DataFrame: """Build the wide dataframe by joining epc_pp with all auxiliary data.""" wide = pl.scan_parquet(epc_pp_path) arcgis = pl.scan_parquet(arcgis_path).select( pl.col("pcds").alias("postcode"), "lat", pl.col("long").alias("lon"), "lsoa21", "oa21", ) wide = wide.join(arcgis, on="postcode", how="inner") wide = _join_journey_times(wide, journey_times_bank_path, "Bank") wide = _join_journey_times(wide, journey_times_fitzrovia_path, "Fitzrovia") iod = pl.scan_parquet(iod_path) wide = wide.join(iod, left_on="lsoa21", right_on="LSOA code (2021)", how="left") ethnicity = pl.scan_parquet(ethnicity_path) wide = wide.join( ethnicity, left_on="Local Authority District code (2024)", right_on="Geography_code", how="left", ) crime = pl.scan_parquet(crime_path) wide = wide.join(crime, left_on="lsoa21", right_on="LSOA code", how="left") wide = wide.with_columns( pl.sum_horizontal( "Violence and sexual offences (avg/yr)", "Robbery (avg/yr)", "Burglary (avg/yr)", "Possession of weapons (avg/yr)", ).alias("serious_crime_avg_yr"), pl.sum_horizontal( "Anti-social behaviour (avg/yr)", "Criminal damage and arson (avg/yr)", "Shoplifting (avg/yr)", "Bicycle theft (avg/yr)", "Theft from the person (avg/yr)", "Other theft (avg/yr)", "Vehicle crime (avg/yr)", "Public order (avg/yr)", "Drugs (avg/yr)", "Other crime (avg/yr)", ).alias("minor_crime_avg_yr"), ) poi_counts = pl.scan_parquet(poi_proximity_path) wide = wide.join(poi_counts, on="postcode", how="left") noise_cols = ["road_noise_lden_db", "rail_noise_lden_db", "airport_noise_lden_db"] noise = ( pl.scan_parquet(noise_path) .with_columns( # NaN → null so max_horizontal ignores missing instead of propagating NaN *[pl.col(c).fill_nan(None) for c in noise_cols], ) .with_columns( pl.max_horizontal(*noise_cols).fill_null(0).alias("noise_lden_db"), ) .select("postcode", "noise_lden_db") ) wide = wide.join(noise, on="postcode", how="left") school_proximity = pl.scan_parquet(school_proximity_path) wide = wide.join(school_proximity, on="postcode", how="left") # Broadband: derive max available download speed tier per postcode from # Ofcom availability percentages. Tiers: Gigabit ≥1000, UFBB ≥300, # UFBB(100) ≥100, SFBB ≥30 Mbps. broadband = ( pl.scan_parquet(broadband_path) .select( pl.col("postcode_space").alias("bb_postcode"), pl.when(pl.col("Gigabit availability (% premises)") > 0) .then(1000) .when(pl.col("UFBB availability (% premises)") > 0) .then(300) .when(pl.col("UFBB (100Mbit/s) availability (% premises)") > 0) .then(100) .when(pl.col("SFBB availability (% premises)") > 0) .then(30) .otherwise(10) .cast(pl.UInt16) .alias("max_download_speed"), ) .group_by("bb_postcode") .agg(pl.col("max_download_speed").max()) ) wide = wide.join(broadband, left_on="postcode", right_on="bb_postcode", how="left") wide = wide.with_columns( pl.when(pl.col("pp_property_type") == pl.col("built_form")) .then(pl.col("pp_property_type")) .otherwise( pl.concat_str( [pl.col("pp_property_type"), pl.lit("/"), pl.col("built_form")] ) ) .alias("property_type_built_form") ) wide = ( wide.filter(pl.col("total_floor_area") > MIN_FLOOR_AREA_M2) .filter(pl.col("latest_price") >= MIN_PRICE) .with_columns( pl.when(pl.col("duration") == "U") .then(None) .otherwise(pl.col("duration")) .alias("duration"), pl.when(pl.col("current_energy_rating") == "INVALID!") .then(None) .otherwise(pl.col("current_energy_rating")) .alias("current_energy_rating"), ) .with_columns( (pl.col("latest_price") / pl.col("total_floor_area")) .round(0) .cast(pl.Int32) .alias("Price per sqm"), ) .drop( "date_of_transfer", "inspection_date", "floor_height", "LSOA name (2021)", "Local Authority District code (2024)", "Local Authority District name (2024)", "Wider Barriers Sub-domain Score", "Geographical Barriers Sub-domain Score", "Adult Skills Sub-domain Score", "Children and Young People Sub-domain Score", "Income Deprivation Affecting Older People (IDAOPI) Score (rate)", "Income Deprivation Affecting Children Index (IDACI) Score (rate)", "Barriers to Housing and Services Score", "lsoa21", "oa21", "pp_property_type", "built_form", ) .rename( { "construction_age_band": "Approximate construction age", "is_construction_date_approximate": "Is construction date approximate", "pp_address": "Address per Property Register", "epc_address": "Address per EPC", "postcode": "Postcode", "duration": "Leashold/Freehold", "current_energy_rating": "Current energy rating", "potential_energy_rating": "Potential energy rating", "total_floor_area": "Total floor area (sqm)", "epc_property_type": "Property type", "property_type_built_form": "Property type/built form", "restaurants_2km": "Restaurants within 2km", "groceries_2km": "Groceries within 2km", "parks_2km": "Parks within 2km", "public_transport_2km": "Public transport within 2km", "latest_price": "Last known price", "number_habitable_rooms": "Number of bedrooms & living rooms", "noise_lden_db": "Noise (dB)", "good_primary_5km": "Good+ primary schools within 5km", "good_secondary_5km": "Good+ secondary schools within 5km", "max_download_speed": "Max available download speed (Mbps)", "serious_crime_avg_yr": "Serious crime (avg/yr)", "minor_crime_avg_yr": "Minor crime (avg/yr)", } ) ) print("Collecting with streaming engine...") return wide.collect(engine="streaming") def main(): parser = argparse.ArgumentParser( description="Build wide property dataframe with all joins" ) parser.add_argument( "--epc-pp", type=Path, required=True, help="EPC-Price Paid joined parquet file" ) parser.add_argument( "--arcgis", type=Path, required=True, help="ArcGIS postcode data parquet file" ) parser.add_argument( "--iod", type=Path, required=True, help="Index of Deprivation parquet file (optional)", ) parser.add_argument( "--poi-proximity", type=Path, help="POI proximity counts parquet file (optional)", ) parser.add_argument( "--journey-times-bank", type=Path, default=None, help="Journey times to Bank parquet file", ) parser.add_argument( "--journey-times-fitzrovia", type=Path, default=None, help="Journey times to Fitzrovia parquet file", ) parser.add_argument( "--ethnicity", type=Path, required=True, help="Ethnicity by local authority parquet file (optional)", ) parser.add_argument( "--crime", type=Path, required=True, help="Crime by LSOA parquet file (optional)", ) parser.add_argument( "--noise", type=Path, required=True, help="Road noise by postcode parquet file" ) parser.add_argument( "--school-proximity", type=Path, required=True, help="School proximity counts parquet file", ) parser.add_argument( "--broadband", type=Path, required=True, help="Broadband performance by output area parquet file", ) parser.add_argument( "--output", type=Path, required=True, help="Output parquet file path" ) args = parser.parse_args() wide = _build_wide( epc_pp_path=args.epc_pp, arcgis_path=args.arcgis, iod_path=args.iod, poi_proximity_path=args.poi_proximity, journey_times_bank_path=args.journey_times_bank, journey_times_fitzrovia_path=args.journey_times_fitzrovia, ethnicity_path=args.ethnicity, crime_path=args.crime, noise_path=args.noise, school_proximity_path=args.school_proximity, broadband_path=args.broadband, ) print(f"Columns: {wide.columns}") print(f"Rows: {wide.height}") wide.write_parquet(args.output) size_mb = args.output.stat().st_size / (1024 * 1024) print(f"Wrote {args.output} ({size_mb:.1f} MB)") if __name__ == "__main__": main()