Move transform logic around
This commit is contained in:
parent
e1b38a1b95
commit
38b0cf1ea1
14 changed files with 1073 additions and 336 deletions
127
pipeline/transform/merge.py
Normal file
127
pipeline/transform/merge.py
Normal file
|
|
@ -0,0 +1,127 @@
|
|||
import argparse
|
||||
import polars as pl
|
||||
from pathlib import Path
|
||||
|
||||
|
||||
def _build_wide(
|
||||
epc_pp_path: Path,
|
||||
arcgis_path: Path,
|
||||
iod_path: Path | None,
|
||||
poi_proximity_path: Path | None,
|
||||
journey_times_path: Path | None,
|
||||
) -> pl.DataFrame:
|
||||
"""Build the wide dataframe by joining epc_pp with all auxiliary data."""
|
||||
print("Loading epc_pp...")
|
||||
wide = pl.read_parquet(epc_pp_path)
|
||||
print(f" {wide.shape[0]:,} rows, {wide.estimated_size('mb'):.1f} MB")
|
||||
|
||||
# GPS coordinates + LSOA from ArcGIS
|
||||
print("Joining GPS coordinates...")
|
||||
arcgis = pl.read_parquet(arcgis_path).select(
|
||||
pl.col("pcds").alias("postcode"),
|
||||
"lat",
|
||||
pl.col("long").alias("lon"),
|
||||
"lsoa21",
|
||||
)
|
||||
wide = wide.join(arcgis, on="postcode", how="inner")
|
||||
print(f" {wide.shape[0]:,} rows after GPS join, {wide.estimated_size('mb'):.1f} MB")
|
||||
|
||||
# Journey times (optional)
|
||||
if journey_times_path and journey_times_path.exists():
|
||||
print("Joining journey times...")
|
||||
journey_times = pl.read_parquet(journey_times_path).select(
|
||||
"postcode",
|
||||
"public_transport_easy_minutes",
|
||||
"public_transport_quick_minutes",
|
||||
"cycling_minutes",
|
||||
)
|
||||
wide = wide.join(journey_times, on="postcode", how="left")
|
||||
print(f" {wide.estimated_size('mb'):.1f} MB after journey times")
|
||||
|
||||
# Index of Deprivation
|
||||
if iod_path and iod_path.exists():
|
||||
print("Joining IoD scores...")
|
||||
iod = pl.read_parquet(iod_path)
|
||||
wide = wide.join(
|
||||
iod, left_on="lsoa21", right_on="LSOA code (2021)", how="left"
|
||||
)
|
||||
print(f" {wide.estimated_size('mb'):.1f} MB after IoD")
|
||||
|
||||
# POI proximity counts (pre-computed per postcode)
|
||||
if poi_proximity_path and poi_proximity_path.exists():
|
||||
print("Joining POI proximity counts...")
|
||||
poi_counts = pl.read_parquet(poi_proximity_path)
|
||||
wide = wide.join(poi_counts, on="postcode", how="left")
|
||||
print(f" {wide.estimated_size('mb'):.1f} MB after POI counts")
|
||||
|
||||
# Convert construction_age_band to numeric year
|
||||
if "construction_age_band" in wide.columns:
|
||||
wide = wide.with_columns(
|
||||
pl.col("construction_age_band")
|
||||
.str.replace("England and Wales: ", "")
|
||||
.str.replace(" onwards", "")
|
||||
.str.extract(r"(\d{4})", 1)
|
||||
.cast(pl.UInt16, strict=False)
|
||||
.alias("construction_age_band"),
|
||||
)
|
||||
|
||||
# Derived columns
|
||||
wide = wide.with_columns(
|
||||
(pl.col("latest_price") / pl.col("total_floor_area")).alias("Price per sqm"),
|
||||
).drop(
|
||||
'date_of_transfer',
|
||||
'inspection_date',
|
||||
'floor_height',
|
||||
'lsoa21',
|
||||
'LSOA code (2021)',
|
||||
'Local Authority District code (2024)',
|
||||
'Local Authority District name (2024)',
|
||||
'imd_score',
|
||||
'housing_barriers_score',
|
||||
'idaci_score',
|
||||
'idaopi_score',
|
||||
'children_young_people_score',
|
||||
'adult_skills_score',
|
||||
'geographical_barriers_score',
|
||||
'wider_barriers_score',
|
||||
).rename({
|
||||
'construction_age_band': "Approximate construction age",
|
||||
"income_score": "Income Score (rate)",
|
||||
"employment_score": "Employment Score (rate)",
|
||||
"education_score": "Education, Skills and Training Score",
|
||||
"health_score": "Health Deprivation and Disability Score",
|
||||
"crime_score": "Crime Score",
|
||||
})
|
||||
|
||||
return wide
|
||||
|
||||
|
||||
def main():
|
||||
parser = argparse.ArgumentParser(description="Build wide property dataframe with all joins")
|
||||
parser.add_argument("--epc-pp", type=Path, required=True, help="EPC-Price Paid joined parquet file")
|
||||
parser.add_argument("--arcgis", type=Path, required=True, help="ArcGIS postcode data parquet file")
|
||||
parser.add_argument("--iod", type=Path, help="Index of Deprivation parquet file (optional)")
|
||||
parser.add_argument("--poi-proximity", type=Path, help="POI proximity counts parquet file (optional)")
|
||||
parser.add_argument("--journey-times", type=Path, help="Journey times parquet file (optional)")
|
||||
parser.add_argument("--output", type=Path, required=True, help="Output parquet file path")
|
||||
args = parser.parse_args()
|
||||
|
||||
wide = _build_wide(
|
||||
epc_pp_path=args.epc_pp,
|
||||
arcgis_path=args.arcgis,
|
||||
iod_path=args.iod,
|
||||
poi_proximity_path=args.poi_proximity,
|
||||
journey_times_path=args.journey_times,
|
||||
)
|
||||
|
||||
print(f"Columns: {wide.columns}")
|
||||
print(f"Rows: {wide.height}")
|
||||
|
||||
wide.write_parquet(args.output)
|
||||
size_mb = args.output.stat().st_size / (1024 * 1024)
|
||||
|
||||
print(f"Wrote {args.output} ({size_mb:.1f} MB)")
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
Loading…
Add table
Add a link
Reference in a new issue