perfect-postcode/pipeline/transform/merge.py

290 lines
9.7 KiB
Python

import argparse
import polars as pl
from pathlib import Path
MIN_PRICE = 10_000
MIN_FLOOR_AREA_M2 = 10
def _build_wide(
epc_pp_path: Path,
arcgis_path: Path,
iod_path: Path,
poi_proximity_path: Path,
journey_times_path: Path,
ethnicity_path: Path,
crime_path: Path,
noise_path: Path,
school_proximity_path: Path,
broadband_path: Path,
) -> pl.DataFrame:
"""Build the wide dataframe by joining epc_pp with all auxiliary data."""
wide = pl.scan_parquet(epc_pp_path)
arcgis = pl.scan_parquet(arcgis_path).select(
pl.col("pcds").alias("postcode"),
"lat",
pl.col("long").alias("lon"),
"lsoa21",
"oa21",
)
wide = wide.join(arcgis, on="postcode", how="inner")
journey_times = (
pl.scan_parquet(journey_times_path)
.select(
"postcode",
"public_transport_easy_minutes",
"public_transport_quick_minutes",
"cycling_minutes",
)
.sort("public_transport_quick_minutes", nulls_last=True)
.group_by("postcode")
.first()
)
wide = wide.join(journey_times, on="postcode", how="left")
iod = pl.scan_parquet(iod_path)
wide = wide.join(iod, left_on="lsoa21", right_on="LSOA code (2021)", how="left")
ethnicity = pl.scan_parquet(ethnicity_path)
wide = wide.join(
ethnicity,
left_on="Local Authority District code (2024)",
right_on="Geography_code",
how="left",
)
crime = pl.scan_parquet(crime_path)
wide = wide.join(crime, left_on="lsoa21", right_on="LSOA code", how="left")
wide = wide.with_columns(
pl.sum_horizontal(
"Violence and sexual offences (avg/yr)",
"Robbery (avg/yr)",
"Burglary (avg/yr)",
"Possession of weapons (avg/yr)",
).alias("serious_crime_avg_yr"),
pl.sum_horizontal(
"Anti-social behaviour (avg/yr)",
"Criminal damage and arson (avg/yr)",
"Shoplifting (avg/yr)",
"Bicycle theft (avg/yr)",
"Theft from the person (avg/yr)",
"Other theft (avg/yr)",
"Vehicle crime (avg/yr)",
"Public order (avg/yr)",
"Drugs (avg/yr)",
"Other crime (avg/yr)",
).alias("minor_crime_avg_yr"),
)
poi_counts = pl.scan_parquet(poi_proximity_path)
wide = wide.join(poi_counts, on="postcode", how="left")
noise_cols = ["road_noise_lden_db", "rail_noise_lden_db", "airport_noise_lden_db"]
noise = (
pl.scan_parquet(noise_path)
.with_columns(
# NaN → null so max_horizontal ignores missing instead of propagating NaN
*[pl.col(c).fill_nan(None) for c in noise_cols],
)
.with_columns(
pl.max_horizontal(*noise_cols).fill_null(0).alias("noise_lden_db"),
)
.select("postcode", "noise_lden_db")
)
wide = wide.join(noise, on="postcode", how="left")
school_proximity = pl.scan_parquet(school_proximity_path)
wide = wide.join(school_proximity, on="postcode", how="left")
# Broadband: derive max available download speed tier per postcode from
# Ofcom availability percentages. Tiers: Gigabit ≥1000, UFBB ≥300,
# UFBB(100) ≥100, SFBB ≥30 Mbps.
broadband = (
pl.scan_parquet(broadband_path)
.select(
pl.col("postcode_space").alias("bb_postcode"),
pl.when(pl.col("Gigabit availability (% premises)") > 0)
.then(1000)
.when(pl.col("UFBB availability (% premises)") > 0)
.then(300)
.when(pl.col("UFBB (100Mbit/s) availability (% premises)") > 0)
.then(100)
.when(pl.col("SFBB availability (% premises)") > 0)
.then(30)
.otherwise(10)
.cast(pl.UInt16)
.alias("max_download_speed"),
)
.group_by("bb_postcode")
.agg(pl.col("max_download_speed").max())
)
wide = wide.join(broadband, left_on="postcode", right_on="bb_postcode", how="left")
wide = wide.with_columns(
pl.when(pl.col("pp_property_type") == pl.col("built_form"))
.then(pl.col("pp_property_type"))
.otherwise(
pl.concat_str(
[pl.col("pp_property_type"), pl.lit("/"), pl.col("built_form")]
)
)
.alias("property_type_built_form")
)
wide = (
wide.filter(pl.col("total_floor_area") > MIN_FLOOR_AREA_M2)
.filter(pl.col("latest_price") >= MIN_PRICE)
.with_columns(
pl.when(pl.col("duration") == "U")
.then(None)
.otherwise(pl.col("duration"))
.alias("duration"),
pl.when(pl.col("current_energy_rating") == "INVALID!")
.then(None)
.otherwise(pl.col("current_energy_rating"))
.alias("current_energy_rating"),
)
.with_columns(
(pl.col("latest_price") / pl.col("total_floor_area"))
.round(0)
.cast(pl.Int32)
.alias("Price per sqm"),
)
.drop(
"date_of_transfer",
"inspection_date",
"floor_height",
"LSOA name (2021)",
"Local Authority District code (2024)",
"Local Authority District name (2024)",
"Wider Barriers Sub-domain Score",
"Geographical Barriers Sub-domain Score",
"Adult Skills Sub-domain Score",
"Children and Young People Sub-domain Score",
"Income Deprivation Affecting Older People (IDAOPI) Score (rate)",
"Income Deprivation Affecting Children Index (IDACI) Score (rate)",
"Barriers to Housing and Services Score",
"lsoa21",
"oa21",
"pp_property_type",
"built_form",
)
.rename(
{
"construction_age_band": "Approximate construction age",
"is_construction_date_approximate": "Is construction date approximate",
"pp_address": "Address per Property Register",
"epc_address": "Address per EPC",
"postcode": "Postcode",
"duration": "Leashold/Freehold",
"current_energy_rating": "Current energy rating",
"potential_energy_rating": "Potential energy rating",
"total_floor_area": "Total floor area (sqm)",
"epc_property_type": "Property type",
"property_type_built_form": "Property type/built form",
"restaurants_2km": "Restaurants within 2km",
"groceries_2km": "Groceries within 2km",
"parks_2km": "Parks within 2km",
"public_transport_2km": "Public transport within 2km",
"latest_price": "Last known price",
"number_habitable_rooms": "Number of bedrooms & living rooms",
"noise_lden_db": "Noise (dB)",
"good_primary_5km": "Good+ primary schools within 5km",
"good_secondary_5km": "Good+ secondary schools within 5km",
"max_download_speed": "Max available download speed (Mbps)",
"serious_crime_avg_yr": "Serious crime (avg/yr)",
"minor_crime_avg_yr": "Minor crime (avg/yr)",
}
)
)
print("Collecting with streaming engine...")
return wide.collect(engine="streaming")
def main():
parser = argparse.ArgumentParser(
description="Build wide property dataframe with all joins"
)
parser.add_argument(
"--epc-pp", type=Path, required=True, help="EPC-Price Paid joined parquet file"
)
parser.add_argument(
"--arcgis", type=Path, required=True, help="ArcGIS postcode data parquet file"
)
parser.add_argument(
"--iod",
type=Path,
required=True,
help="Index of Deprivation parquet file (optional)",
)
parser.add_argument(
"--poi-proximity",
type=Path,
help="POI proximity counts parquet file (optional)",
)
parser.add_argument(
"--journey-times",
required=True,
type=Path,
help="Journey times parquet file (optional)",
)
parser.add_argument(
"--ethnicity",
type=Path,
required=True,
help="Ethnicity by local authority parquet file (optional)",
)
parser.add_argument(
"--crime",
type=Path,
required=True,
help="Crime by LSOA parquet file (optional)",
)
parser.add_argument(
"--noise", type=Path, required=True, help="Road noise by postcode parquet file"
)
parser.add_argument(
"--school-proximity",
type=Path,
required=True,
help="School proximity counts parquet file",
)
parser.add_argument(
"--broadband",
type=Path,
required=True,
help="Broadband performance by output area parquet file",
)
parser.add_argument(
"--output", type=Path, required=True, help="Output parquet file path"
)
args = parser.parse_args()
wide = _build_wide(
epc_pp_path=args.epc_pp,
arcgis_path=args.arcgis,
iod_path=args.iod,
poi_proximity_path=args.poi_proximity,
journey_times_path=args.journey_times,
ethnicity_path=args.ethnicity,
crime_path=args.crime,
noise_path=args.noise,
school_proximity_path=args.school_proximity,
broadband_path=args.broadband,
)
print(f"Columns: {wide.columns}")
print(f"Rows: {wide.height}")
wide.write_parquet(args.output)
size_mb = args.output.stat().st_size / (1024 * 1024)
print(f"Wrote {args.output} ({size_mb:.1f} MB)")
if __name__ == "__main__":
main()