perfect-postcode/pipeline/transform/merge.py
2026-01-31 13:07:09 +00:00

151 lines
4.9 KiB
Python

import argparse
import polars as pl
from pathlib import Path
def _build_wide(
epc_pp_path: Path,
arcgis_path: Path,
iod_path: Path | None,
poi_proximity_path: Path | None,
journey_times_path: Path | None,
) -> pl.DataFrame:
"""Build the wide dataframe by joining epc_pp with all auxiliary data."""
print("Loading epc_pp...")
wide = pl.read_parquet(epc_pp_path)
print(f" {wide.shape[0]:,} rows, {wide.estimated_size('mb'):.1f} MB")
# GPS coordinates + LSOA from ArcGIS
print("Joining GPS coordinates...")
arcgis = pl.read_parquet(arcgis_path).select(
pl.col("pcds").alias("postcode"),
"lat",
pl.col("long").alias("lon"),
"lsoa21",
)
wide = wide.join(arcgis, on="postcode", how="inner")
print(
f" {wide.shape[0]:,} rows after GPS join, {wide.estimated_size('mb'):.1f} MB"
)
# Journey times (optional)
if journey_times_path and journey_times_path.exists():
print("Joining journey times...")
journey_times = pl.read_parquet(journey_times_path).select(
"postcode",
"public_transport_easy_minutes",
"public_transport_quick_minutes",
"cycling_minutes",
)
wide = wide.join(journey_times, on="postcode", how="left")
print(f" {wide.estimated_size('mb'):.1f} MB after journey times")
# Index of Deprivation
if iod_path and iod_path.exists():
print("Joining IoD scores...")
iod = pl.read_parquet(iod_path)
wide = wide.join(iod, left_on="lsoa21", right_on="LSOA code (2021)", how="left")
print(f" {wide.estimated_size('mb'):.1f} MB after IoD")
# POI proximity counts (pre-computed per postcode)
if poi_proximity_path and poi_proximity_path.exists():
print("Joining POI proximity counts...")
poi_counts = pl.read_parquet(poi_proximity_path)
wide = wide.join(poi_counts, on="postcode", how="left")
print(f" {wide.estimated_size('mb'):.1f} MB after POI counts")
# Convert construction_age_band to numeric year
if "construction_age_band" in wide.columns:
wide = wide.with_columns(
pl.col("construction_age_band")
.str.replace("England and Wales: ", "")
.str.replace(" onwards", "")
.str.extract(r"(\d{4})", 1)
.cast(pl.UInt16, strict=False)
.alias("construction_age_band"),
)
# Derived columns
wide = (
wide.with_columns(
(pl.col("latest_price") / pl.col("total_floor_area")).alias(
"Price per sqm"
),
)
.drop(
"date_of_transfer",
"inspection_date",
"floor_height",
"lsoa21",
"LSOA code (2021)",
"Local Authority District code (2024)",
"Local Authority District name (2024)",
"imd_score",
"housing_barriers_score",
"idaci_score",
"idaopi_score",
"children_young_people_score",
"adult_skills_score",
"geographical_barriers_score",
"wider_barriers_score",
)
.rename(
{
"construction_age_band": "Approximate construction age",
"income_score": "Income Score (rate)",
"employment_score": "Employment Score (rate)",
"education_score": "Education, Skills and Training Score",
"health_score": "Health Deprivation and Disability Score",
"crime_score": "Crime Score",
}
)
)
return wide
def main():
parser = argparse.ArgumentParser(
description="Build wide property dataframe with all joins"
)
parser.add_argument(
"--epc-pp", type=Path, required=True, help="EPC-Price Paid joined parquet file"
)
parser.add_argument(
"--arcgis", type=Path, required=True, help="ArcGIS postcode data parquet file"
)
parser.add_argument(
"--iod", type=Path, help="Index of Deprivation parquet file (optional)"
)
parser.add_argument(
"--poi-proximity",
type=Path,
help="POI proximity counts parquet file (optional)",
)
parser.add_argument(
"--journey-times", type=Path, help="Journey times parquet file (optional)"
)
parser.add_argument(
"--output", type=Path, required=True, help="Output parquet file path"
)
args = parser.parse_args()
wide = _build_wide(
epc_pp_path=args.epc_pp,
arcgis_path=args.arcgis,
iod_path=args.iod,
poi_proximity_path=args.poi_proximity,
journey_times_path=args.journey_times,
)
print(f"Columns: {wide.columns}")
print(f"Rows: {wide.height}")
wide.write_parquet(args.output)
size_mb = args.output.stat().st_size / (1024 * 1024)
print(f"Wrote {args.output} ({size_mb:.1f} MB)")
if __name__ == "__main__":
main()