import argparse import csv import io import tempfile import zipfile from pathlib import Path import polars as pl import pyarrow as pa import pyarrow.csv as pa_csv import pyarrow.parquet as pq from pipeline.local_temp import local_tmp_dir from ..utils import ( fuzzy_join_on_postcode, normalize_address_key, normalize_postcode_key, ) pl.Config.set_tbl_cols(-1) RATING_RANK = {"A": 1, "B": 2, "C": 3, "D": 4, "E": 5, "F": 6, "G": 7} # Value-quality floor for price aggregations. A flat nominal floor is a blunt # tool against a deflating threshold — £50k was completely normal for a 1990s # house, so a 50k floor wrongly discarded ~a third of legitimate 1990s # open-market sales (and deleted properties whose only sales were old/cheap), # biasing early-year price history upward. 10k recovers the large [10k,50k) # band of genuine cheaper sales while still excluding the nominal/junk transfers # (£1 etc.). A small tail of real sub-10k sales is still dropped — a deliberate # conservative tradeoff to keep clearly-implausible transfers out. MIN_PRICE = 10_000 # Plausible construction-year range; band-derived years outside it (e.g. OCR # noise like 1012 or 2202) are nulled rather than published. MIN_BUILD_YEAR = 1700 MAX_BUILD_YEAR = 2030 def epc_band_to_year(band: pl.Expr) -> pl.Expr: """Map an EPC construction age band to a single representative build year. EPC age bands are ranges (e.g. ``1950-1966``); we use the band MIDPOINT (1958) rather than the lower bound, which previously biased every band-derived year ~10-15 years too young. Open-ended lower bands (``before 1900``) are too wide to pin to a year and return null. Single-year / ``... onwards`` bands use that year. Already-numeric inputs (a year produced by an earlier call) pass through unchanged. Years outside [MIN_BUILD_YEAR, MAX_BUILD_YEAR] are nulled. """ text = ( band.cast(pl.Utf8) .str.replace("England and Wales: ", "") .str.replace(" onwards", "") ) low = text.str.extract(r"(\d{4})", 1).cast(pl.Int32, strict=False) high = text.str.extract(r"(\d{4})\D+(\d{4})", 2).cast(pl.Int32, strict=False) year = ( pl.when(text.str.starts_with("before ")) .then(None) .when(high.is_not_null()) .then(((low + high) / 2).round(0).cast(pl.Int32)) .otherwise(low) ) return ( pl.when((year >= MIN_BUILD_YEAR) & (year <= MAX_BUILD_YEAR)) .then(year) .otherwise(None) .cast(pl.UInt16, strict=False) ) EPC_SOURCE_COLUMNS = [ "address", "postcode", "uprn", "current_energy_rating", "potential_energy_rating", "property_type", "built_form", "inspection_date", "total_floor_area", "number_habitable_rooms", "floor_height", "construction_age_band", "tenure", ] def _normalise_csv_columns(columns: list[str]) -> list[str]: return [column.strip().lower() for column in columns] def _clean_string(column: str) -> pl.Expr: stripped = pl.col(column).cast(pl.String).str.strip_chars() return pl.when(stripped == "").then(None).otherwise(stripped) def _clean_number(column: str, dtype: pl.DataType) -> pl.Expr: return _clean_string(column).cast(dtype, strict=False) def _select_epc_columns(raw: pl.LazyFrame) -> pl.LazyFrame: return ( raw.select( _clean_string("address").alias("epc_address"), _clean_string("postcode").str.to_uppercase().alias("epc_postcode"), # UPRN keys an exact listing->EPC join downstream (~99% populated). _clean_string("uprn").alias("uprn"), _clean_string("current_energy_rating") .str.to_uppercase() .alias("current_energy_rating"), _clean_string("potential_energy_rating") .str.to_uppercase() .alias("potential_energy_rating"), _clean_string("property_type").alias("epc_property_type"), _clean_string("built_form").alias("built_form"), # Parse to a real Date once (unparseable/blank -> null) so dedup can # sort newest-first with nulls_last and _event_year can use dt.year(); # a lexicographic string sort would let a null/garbled date win under # Polars' default nulls-first descending order. EPC inspection dates # are ISO (YYYY-MM-DD). _clean_string("inspection_date") .str.to_date(format="%Y-%m-%d", strict=False) .alias("inspection_date"), _clean_number("total_floor_area", pl.Float64).alias("total_floor_area"), _clean_number("number_habitable_rooms", pl.Int16).alias( "number_habitable_rooms" ), _clean_number("floor_height", pl.Float64).alias("floor_height"), _clean_string("construction_age_band").alias("construction_age_band"), _clean_string("tenure").alias("tenure"), ) .filter(pl.col("epc_address").is_not_null()) .with_columns( pl.when(pl.col("number_habitable_rooms") == 0) .then(None) .otherwise(pl.col("number_habitable_rooms")) .alias("number_habitable_rooms"), ) ) def _certificate_member_names(zip_file: zipfile.ZipFile) -> list[str]: return sorted( name for name in zip_file.namelist() if not name.endswith("/") and Path(name).name.lower().startswith("certificates") and name.lower().endswith(".csv") ) def _read_zip_csv_header(zip_file: zipfile.ZipFile, member_name: str) -> list[str]: with zip_file.open(member_name) as member: text = io.TextIOWrapper(member, encoding="utf-8-sig", newline="") try: return next(csv.reader(text)) except StopIteration as exc: raise ValueError(f"EPC CSV member is empty: {member_name}") from exc def _source_columns_for_header(header: list[str]) -> list[str]: columns_by_normalised_name = { normalised: source for source, normalised in zip(header, _normalise_csv_columns(header)) } return [ columns_by_normalised_name.get(column, column) for column in EPC_SOURCE_COLUMNS ] def _zip_certificates_to_parquet(zip_path: Path, output_path: Path) -> None: schema = pa.schema((column, pa.string()) for column in EPC_SOURCE_COLUMNS) writer = pq.ParquetWriter(output_path, schema=schema, compression="zstd") try: try: zip_file = zipfile.ZipFile(zip_path) except zipfile.BadZipFile as exc: raise ValueError( f"{zip_path} is not a readable EPC zip archive; re-download " "domestic-csv.zip and try again" ) from exc with zip_file: member_names = _certificate_member_names(zip_file) if not member_names: raise ValueError(f"No certificate CSV files found in {zip_path}") for member_name in member_names: print(f"Reading EPC certificates from {member_name}") source_columns = _source_columns_for_header( _read_zip_csv_header(zip_file, member_name) ) convert_options = pa_csv.ConvertOptions( include_columns=source_columns, include_missing_columns=True, column_types={ source_column: pa.string() for source_column in source_columns }, strings_can_be_null=True, ) read_options = pa_csv.ReadOptions(block_size=64 * 1024 * 1024) with zip_file.open(member_name) as member: reader = pa_csv.open_csv( member, read_options=read_options, convert_options=convert_options, ) while True: try: batch = reader.read_next_batch() except StopIteration: break if batch.num_rows == 0: continue writer.write_batch(batch.rename_columns(EPC_SOURCE_COLUMNS)) finally: writer.close() def _scan_epc_certificates(epc_path: Path, temp_dir: Path) -> pl.LazyFrame: if epc_path.suffix.lower() == ".zip": parquet_path = temp_dir / "epc-certificates.parquet" _zip_certificates_to_parquet(epc_path, parquet_path) raw = pl.scan_parquet(parquet_path) else: raw = pl.scan_csv( epc_path, infer_schema=False, with_column_names=_normalise_csv_columns, ) return _select_epc_columns(raw) def main(): parser = argparse.ArgumentParser(description="Fuzzy join EPC and Price Paid data") parser.add_argument( "--epc", type=Path, required=True, help="EPC certificates CSV file or zip" ) parser.add_argument( "--price-paid", type=Path, required=True, help="Price paid parquet file" ) parser.add_argument( "--output", type=Path, required=True, help="Output parquet file path" ) args = parser.parse_args() with tempfile.TemporaryDirectory( prefix="epc_certificates_", dir=local_tmp_dir() ) as tmpdir: _run(args.epc, args.price_paid, args.output, Path(tmpdir)) def _run(epc_path: Path, price_paid_path: Path, output_path: Path, temp_dir: Path): epc_base = _scan_epc_certificates(epc_path, temp_dir).with_columns( normalize_address_key(pl.col("epc_address")).alias("_epc_match_address"), normalize_postcode_key(pl.col("epc_postcode")).alias("_epc_match_postcode"), ) # Dedup fork: keep latest certificate per property. inspection_date is a typed # Date (see _select_epc_columns); nulls_last keeps a real-dated cert ahead of a # null/unparseable-dated one so the genuinely newest certificate is chosen. epc = ( epc_base.sort("inspection_date", descending=True, nulls_last=True) .group_by("_epc_match_address", "_epc_match_postcode") .first() .drop("tenure") ) # Events fork: detect renovation events between consecutive certificates # Collect eagerly because .over() window functions don't work in streaming # engine (fuzzy_join.py:50 uses sink_parquet which requires streaming). events = ( epc_base.sort("inspection_date") .with_columns( pl.col("current_energy_rating") .replace_strict(RATING_RANK, default=None, return_dtype=pl.Int32) .alias("_rating_rank"), ) .with_columns( pl.col("number_habitable_rooms") .shift(1) .over("_epc_match_address", "_epc_match_postcode") .alias("_prev_rooms"), pl.col("total_floor_area") .shift(1) .over("_epc_match_address", "_epc_match_postcode") .alias("_prev_area"), pl.col("_rating_rank") .shift(1) .over("_epc_match_address", "_epc_match_postcode") .alias("_prev_rating_rank"), ) .with_columns( pl.when( pl.col("number_habitable_rooms").is_not_null() & pl.col("_prev_rooms").is_not_null() & (pl.col("number_habitable_rooms") != pl.col("_prev_rooms")) ) .then(pl.lit("Remodelling")) .when( pl.col("total_floor_area").is_not_null() & pl.col("_prev_area").is_not_null() & (pl.col("total_floor_area") > pl.col("_prev_area")) ) .then(pl.lit("Extension")) .when( pl.col("_rating_rank").is_not_null() & pl.col("_prev_rating_rank").is_not_null() & (pl.col("_rating_rank") < pl.col("_prev_rating_rank")) ) .then(pl.lit("Renovation")) .otherwise(pl.lit(None, dtype=pl.String)) .alias("_event"), ) .filter(pl.col("_event").is_not_null()) .with_columns( pl.col("inspection_date").dt.year().cast(pl.Int32).alias("_event_year"), ) .group_by("_epc_match_address", "_epc_match_postcode") .agg( pl.struct( pl.col("_event_year").alias("year"), pl.col("_event").alias("event"), ).alias("renovation_history"), ) .collect() ) event_counts = ( events["renovation_history"].explode().struct.field("event").value_counts() ) print(f"Renovation events: {events.height} properties with events") print(event_counts) # Social tenure fork: flag properties that were ever social housing social_tenure = ( epc_base.filter(pl.col("tenure").str.to_lowercase().str.contains("social")) .select("_epc_match_address", "_epc_match_postcode") .unique() .with_columns(pl.lit("Yes").alias("was_council_house")) .collect() ) print(f"Former council houses (EPC social tenure): {social_tenure.height}") # Left-join events and social tenure back onto dedup EPC epc = ( epc.join( events.lazy(), on=["_epc_match_address", "_epc_match_postcode"], how="left", ) .join( social_tenure.lazy(), on=["_epc_match_address", "_epc_match_postcode"], how="left", ) .with_columns( pl.col("was_council_house").fill_null("No"), ) ) print("EPC dataset") print(epc.head().collect()) # https://www.gov.uk/guidance/about-the-price-paid-data property_type_map = { "D": "Detached", "S": "Semi-Detached", "T": "Terraced", "F": "Flats/Maisonettes", "O": "Other", } duration_map = {"F": "Freehold", "L": "Leasehold"} # price >= MIN_PRICE and ppd_category == "A" (standard open-market sale) are # VALUE-QUALITY filters: they gate the price aggregations only. Category B # entries (repossessions, bulk/portfolio, power-of-sale transfers) and sub-MIN # sales must not pollute latest_price / historical_prices (and the downstream # price-per-sqm feature), but they MUST still count for first_transfer_date / # old_new so a new-build's genuine earliest transfer year is preserved. price_ok = pl.col("price") >= MIN_PRICE category_ok = pl.col("ppd_category") == "A" quality_ok = price_ok & category_ok price_paid = ( pl.scan_parquet(price_paid_path) .select( "price", "date_of_transfer", pl.col("property_type") .alias("pp_property_type") .replace(property_type_map), pl.col("postcode").str.strip_chars(), "paon", "saon", "street", "locality", "town_city", pl.col("duration").replace(duration_map), "old_new", "ppd_category", ) .filter(pl.col("pp_property_type") != "Other") .with_columns( pl.concat_str( [pl.col("saon"), pl.col("paon"), pl.col("street")], separator=" ", ignore_nulls=True, ).alias("pp_address"), ) .with_columns( normalize_address_key(pl.col("pp_address")).alias("_pp_match_address"), normalize_postcode_key(pl.col("postcode")).alias("_pp_match_postcode"), ) .filter(pl.col("_pp_match_postcode").is_not_null()) .with_columns( pl.coalesce("_pp_match_address", "pp_address").alias("_pp_group_address"), pl.col("_pp_match_postcode").alias("_pp_group_postcode"), ) .filter(pl.col("pp_address").is_not_null()) .sort("date_of_transfer") .group_by("_pp_group_address", "_pp_group_postcode", maintain_order=True) .agg( pl.col("pp_address").last(), pl.col("postcode").last(), pl.col("_pp_match_address").last(), pl.col("_pp_match_postcode").last(), # Price aggregations are restricted to quality-passing sales. pl.struct( pl.col("date_of_transfer").dt.year().alias("year"), pl.col("date_of_transfer").dt.month().cast(pl.UInt8).alias("month"), "price", ) .filter(quality_ok) .alias("historical_prices"), pl.col("pp_property_type").last(), pl.col("duration").last(), pl.col("price").filter(quality_ok).last().alias("latest_price"), pl.col("date_of_transfer").filter(quality_ok).last(), # first_transfer_date / old_new reflect the genuine earliest transfer # over the full per-group transaction stream (not value-filtered). pl.col("date_of_transfer").first().alias("first_transfer_date"), pl.col("old_new").first(), ) # Preserve the property universe: previously a property needed >=1 sale # >=MIN_PRICE to form a group, so drop groups with no quality-passing sale. .filter(pl.col("latest_price").is_not_null()) ) print("Price paid dataset") print(price_paid.head().collect()) joined = ( fuzzy_join_on_postcode( left=price_paid, right=epc, left_address_col="pp_address", right_address_col="epc_address", left_postcode_col="postcode", right_postcode_col="epc_postcode", ) .drop("epc_postcode") .collect(engine="streaming") ) matched = joined.filter( pl.col("epc_address").is_not_null() & pl.col("pp_address").is_not_null() ) total = joined.height print(f"Unique properties: {total}") print(f"Matched: {matched.height} ({100 * matched.height / total:.1f}%)") print(f"Unmatched: {total - matched.height}") # For new-builds (old_new == "Y"), use the first transaction date year as # the exact construction date; otherwise fall back to the EPC age band. epc_band_year = epc_band_to_year(pl.col("construction_age_band")) transfer_year = ( pl.col("first_transfer_date").dt.year().cast(pl.UInt16, strict=False) ) is_new_build = pl.col("old_new") == "Y" joined = joined.with_columns( pl.when(is_new_build & transfer_year.is_not_null()) .then(transfer_year) .otherwise(epc_band_year) .alias("construction_age_band"), pl.when(is_new_build & transfer_year.is_not_null()) .then(pl.lit(0, dtype=pl.UInt8)) .when(epc_band_year.is_not_null()) .then(pl.lit(1, dtype=pl.UInt8)) .otherwise(pl.lit(None, dtype=pl.UInt8)) .alias("is_construction_date_approximate"), ).drop( [ "old_new", "first_transfer_date", "_pp_match_address", "_pp_match_postcode", "_pp_group_address", "_pp_group_postcode", "_epc_match_address", "_epc_match_postcode", ], strict=False, ) joined = joined.rename({col: col.lower() for col in joined.columns}) print(joined.head()) joined.write_parquet(output_path) print(f"Wrote {output_path}") if __name__ == "__main__": main()