import argparse import polars as pl from pathlib import Path from ..utils import fuzzy_join_on_postcode MIN_FLOOR_AREA_M2 = 10 pl.Config.set_tbl_cols(-1) def main(): parser = argparse.ArgumentParser(description="Fuzzy join EPC and Price Paid data") parser.add_argument( "--epc", type=Path, required=True, help="EPC certificates CSV file" ) parser.add_argument( "--price-paid", type=Path, required=True, help="Price paid parquet file" ) parser.add_argument( "--output", type=Path, required=True, help="Output parquet file path" ) args = parser.parse_args() epc = ( pl.scan_csv(args.epc) .select( pl.col("ADDRESS").alias("epc_address"), "POSTCODE", "CURRENT_ENERGY_RATING", "POTENTIAL_ENERGY_RATING", pl.col("PROPERTY_TYPE").alias("epc_property_type"), "BUILT_FORM", "INSPECTION_DATE", "TOTAL_FLOOR_AREA", "NUMBER_HABITABLE_ROOMS", "FLOOR_HEIGHT", "CONSTRUCTION_AGE_BAND", ) .filter(pl.col("epc_address").is_not_null()) .sort("INSPECTION_DATE", descending=True) .group_by("epc_address", "POSTCODE") .first() ) print("EPC dataset") print(epc.head().collect()) # https://www.gov.uk/guidance/about-the-price-paid-data property_type_map = { "D": "Detached", "S": "Semi-Detached", "T": "Terraced", "F": "Flats/Maisonettes", "O": "Other", } duration_map = {"F": "Freehold", "L": "Leasehold"} price_paid = ( pl.scan_parquet(args.price_paid) .select( "price", "date_of_transfer", pl.col("property_type") .alias("pp_property_type") .replace(property_type_map), "postcode", "paon", "saon", "street", "locality", "town_city", pl.col("duration").replace(duration_map), "old_new", ) .filter(pl.col("pp_property_type") != "Other") .with_columns( pl.concat_str( [pl.col("saon"), pl.col("paon"), pl.col("street")], separator=" ", ignore_nulls=True, ).alias("pp_address"), ) .sort("date_of_transfer") .group_by("pp_address", "postcode", maintain_order=True) .agg( pl.struct( pl.col("date_of_transfer").dt.year().alias("year"), "price", ).alias("historical_prices"), pl.col("pp_property_type").last(), pl.col("duration").last(), pl.col("price").last().alias("latest_price"), pl.col("date_of_transfer").last(), pl.col("date_of_transfer").first().alias("first_transfer_date"), pl.col("old_new").first(), ) ).filter(pl.col("pp_address").is_not_null()) print("Price paid dataset") print(price_paid.head().collect()) joined = ( fuzzy_join_on_postcode( left=price_paid, right=epc, left_address_col="pp_address", right_address_col="epc_address", left_postcode_col="postcode", right_postcode_col="POSTCODE", ) .drop("POSTCODE") .collect(engine="streaming") ) matched = joined.filter( pl.col("epc_address").is_not_null() & pl.col("pp_address").is_not_null() ) total = joined.height print(f"Unique properties: {total}") print(f"Matched: {matched.height} ({100 * matched.height / total:.1f}%)") print(f"Unmatched: {total - matched.height}") matched = matched.filter(pl.col("TOTAL_FLOOR_AREA") >= MIN_FLOOR_AREA_M2) # For new-builds (old_new == "Y"), use the first transaction date year as # the exact construction date; otherwise fall back to the EPC age band. epc_band_year = ( pl.col("CONSTRUCTION_AGE_BAND") .str.replace("England and Wales: ", "") .str.replace(" onwards", "") .str.extract(r"(\d{4})", 1) .cast(pl.UInt16, strict=False) ) transfer_year = pl.col("first_transfer_date").dt.year().cast(pl.UInt16, strict=False) is_new_build = pl.col("old_new") == "Y" matched = matched.with_columns( pl.when(is_new_build & transfer_year.is_not_null()) .then(transfer_year) .otherwise(epc_band_year) .alias("CONSTRUCTION_AGE_BAND"), pl.when(is_new_build & transfer_year.is_not_null()) .then(pl.lit(0, dtype=pl.UInt8)) .when(epc_band_year.is_not_null()) .then(pl.lit(1, dtype=pl.UInt8)) .otherwise(pl.lit(None, dtype=pl.UInt8)) .alias("is_construction_date_approximate"), ).drop("old_new", "first_transfer_date") matched = matched.rename({col: col.lower() for col in joined.columns}) print(matched.head()) matched.write_parquet(args.output) print(f"Wrote {args.output}") if __name__ == "__main__": main()