import argparse import polars as pl from pathlib import Path from ..utils import fuzzy_join_on_postcode pl.Config.set_tbl_cols(-1) RATING_RANK = {"A": 1, "B": 2, "C": 3, "D": 4, "E": 5, "F": 6, "G": 7} MIN_PRICE = 50_000 def main(): parser = argparse.ArgumentParser(description="Fuzzy join EPC and Price Paid data") parser.add_argument( "--epc", type=Path, required=True, help="EPC certificates CSV file" ) parser.add_argument( "--price-paid", type=Path, required=True, help="Price paid parquet file" ) parser.add_argument( "--output", type=Path, required=True, help="Output parquet file path" ) args = parser.parse_args() epc_base = ( pl.scan_csv(args.epc) .select( pl.col("ADDRESS").alias("epc_address"), "POSTCODE", "CURRENT_ENERGY_RATING", "POTENTIAL_ENERGY_RATING", pl.col("PROPERTY_TYPE").alias("epc_property_type"), "BUILT_FORM", "INSPECTION_DATE", "TOTAL_FLOOR_AREA", "NUMBER_HABITABLE_ROOMS", "FLOOR_HEIGHT", "CONSTRUCTION_AGE_BAND", "TENURE", ) .filter(pl.col("epc_address").is_not_null()) .with_columns( pl.when(pl.col("NUMBER_HABITABLE_ROOMS") == 0) .then(None) .otherwise(pl.col("NUMBER_HABITABLE_ROOMS")) .alias("NUMBER_HABITABLE_ROOMS"), ) ) # Dedup fork: keep latest certificate per property (existing logic) epc = ( epc_base.sort("INSPECTION_DATE", descending=True) .group_by("epc_address", "POSTCODE") .first() .drop("TENURE") ) # Events fork: detect renovation events between consecutive certificates # Collect eagerly because .over() window functions don't work in streaming # engine (fuzzy_join.py:50 uses sink_parquet which requires streaming). events = ( epc_base.sort("INSPECTION_DATE") .with_columns( pl.col("CURRENT_ENERGY_RATING") .replace_strict(RATING_RANK, default=None, return_dtype=pl.Int32) .alias("_rating_rank"), ) .with_columns( pl.col("NUMBER_HABITABLE_ROOMS") .shift(1) .over("epc_address", "POSTCODE") .alias("_prev_rooms"), pl.col("TOTAL_FLOOR_AREA") .shift(1) .over("epc_address", "POSTCODE") .alias("_prev_area"), pl.col("_rating_rank") .shift(1) .over("epc_address", "POSTCODE") .alias("_prev_rating_rank"), ) .with_columns( pl.when( pl.col("NUMBER_HABITABLE_ROOMS").is_not_null() & pl.col("_prev_rooms").is_not_null() & (pl.col("NUMBER_HABITABLE_ROOMS") != pl.col("_prev_rooms")) ) .then(pl.lit("Remodelling")) .when( pl.col("TOTAL_FLOOR_AREA").is_not_null() & pl.col("_prev_area").is_not_null() & (pl.col("TOTAL_FLOOR_AREA") > pl.col("_prev_area")) ) .then(pl.lit("Extension")) .when( pl.col("_rating_rank").is_not_null() & pl.col("_prev_rating_rank").is_not_null() & (pl.col("_rating_rank") < pl.col("_prev_rating_rank")) ) .then(pl.lit("Renovation")) .otherwise(pl.lit(None, dtype=pl.String)) .alias("_event"), ) .filter(pl.col("_event").is_not_null()) .with_columns( pl.col("INSPECTION_DATE") .cast(pl.String) .str.slice(0, 4) .cast(pl.Int32) .alias("_event_year"), ) .group_by("epc_address", "POSTCODE") .agg( pl.struct( pl.col("_event_year").alias("year"), pl.col("_event").alias("event"), ).alias("renovation_history"), ) .collect() ) event_counts = ( events["renovation_history"].explode().struct.field("event").value_counts() ) print(f"Renovation events: {events.height} properties with events") print(event_counts) # Social tenure fork: flag properties that were ever social housing social_tenure = ( epc_base.filter(pl.col("TENURE").str.to_lowercase().str.contains("social")) .select("epc_address", "POSTCODE") .unique() .with_columns(pl.lit("Yes").alias("was_council_house")) .collect() ) print(f"Former council houses (EPC social tenure): {social_tenure.height}") # Left-join events and social tenure back onto dedup EPC epc = ( epc.join( events.lazy(), on=["epc_address", "POSTCODE"], how="left", ) .join( social_tenure.lazy(), on=["epc_address", "POSTCODE"], how="left", ) .with_columns( pl.col("was_council_house").fill_null("No"), ) ) print("EPC dataset") print(epc.head().collect()) # https://www.gov.uk/guidance/about-the-price-paid-data property_type_map = { "D": "Detached", "S": "Semi-Detached", "T": "Terraced", "F": "Flats/Maisonettes", "O": "Other", } duration_map = {"F": "Freehold", "L": "Leasehold"} price_paid = ( pl.scan_parquet(args.price_paid) .select( "price", "date_of_transfer", pl.col("property_type") .alias("pp_property_type") .replace(property_type_map), pl.col("postcode").str.strip_chars(), "paon", "saon", "street", "locality", "town_city", pl.col("duration").replace(duration_map), "old_new", ) .filter(pl.col("pp_property_type") != "Other") .filter(pl.col("price") >= MIN_PRICE) .with_columns( pl.concat_str( [pl.col("saon"), pl.col("paon"), pl.col("street")], separator=" ", ignore_nulls=True, ).alias("pp_address"), ) .sort("date_of_transfer") .group_by("pp_address", "postcode", maintain_order=True) .agg( pl.struct( pl.col("date_of_transfer").dt.year().alias("year"), pl.col("date_of_transfer").dt.month().cast(pl.UInt8).alias("month"), "price", ).alias("historical_prices"), pl.col("pp_property_type").last(), pl.col("duration").last(), pl.col("price").last().alias("latest_price"), pl.col("date_of_transfer").last(), pl.col("date_of_transfer").first().alias("first_transfer_date"), pl.col("old_new").first(), ) ).filter(pl.col("pp_address").is_not_null()) print("Price paid dataset") print(price_paid.head().collect()) joined = ( fuzzy_join_on_postcode( left=price_paid, right=epc, left_address_col="pp_address", right_address_col="epc_address", left_postcode_col="postcode", right_postcode_col="POSTCODE", ) .drop("POSTCODE") .collect(engine="streaming") ) matched = joined.filter( pl.col("epc_address").is_not_null() & pl.col("pp_address").is_not_null() ) total = joined.height print(f"Unique properties: {total}") print(f"Matched: {matched.height} ({100 * matched.height / total:.1f}%)") print(f"Unmatched: {total - matched.height}") # For new-builds (old_new == "Y"), use the first transaction date year as # the exact construction date; otherwise fall back to the EPC age band. epc_band_year = ( pl.col("CONSTRUCTION_AGE_BAND") .str.replace("England and Wales: ", "") .str.replace(" onwards", "") .str.extract(r"(\d{4})", 1) .cast(pl.UInt16, strict=False) ) transfer_year = ( pl.col("first_transfer_date").dt.year().cast(pl.UInt16, strict=False) ) is_new_build = pl.col("old_new") == "Y" joined = joined.with_columns( pl.when(is_new_build & transfer_year.is_not_null()) .then(transfer_year) .otherwise(epc_band_year) .alias("CONSTRUCTION_AGE_BAND"), pl.when(is_new_build & transfer_year.is_not_null()) .then(pl.lit(0, dtype=pl.UInt8)) .when(epc_band_year.is_not_null()) .then(pl.lit(1, dtype=pl.UInt8)) .otherwise(pl.lit(None, dtype=pl.UInt8)) .alias("is_construction_date_approximate"), ).drop("old_new", "first_transfer_date") joined = joined.rename({col: col.lower() for col in joined.columns}) print(joined.head()) joined.write_parquet(args.output) print(f"Wrote {args.output}") if __name__ == "__main__": main()