perfect-postcode/pipeline/transform/join_epc_pp.py
2026-05-14 08:17:10 +01:00

451 lines
15 KiB
Python

import argparse
import csv
import io
import tempfile
import zipfile
from pathlib import Path
import polars as pl
import pyarrow as pa
import pyarrow.csv as pa_csv
import pyarrow.parquet as pq
from ..utils import (
fuzzy_join_on_postcode,
normalize_address_key,
normalize_postcode_key,
)
pl.Config.set_tbl_cols(-1)
RATING_RANK = {"A": 1, "B": 2, "C": 3, "D": 4, "E": 5, "F": 6, "G": 7}
MIN_PRICE = 50_000
EPC_SOURCE_COLUMNS = [
"address",
"postcode",
"current_energy_rating",
"potential_energy_rating",
"property_type",
"built_form",
"inspection_date",
"total_floor_area",
"number_habitable_rooms",
"floor_height",
"construction_age_band",
"tenure",
]
def _normalise_csv_columns(columns: list[str]) -> list[str]:
return [column.strip().lower() for column in columns]
def _clean_string(column: str) -> pl.Expr:
stripped = pl.col(column).cast(pl.String).str.strip_chars()
return pl.when(stripped == "").then(None).otherwise(stripped)
def _clean_number(column: str, dtype: pl.DataType) -> pl.Expr:
return _clean_string(column).cast(dtype, strict=False)
def _select_epc_columns(raw: pl.LazyFrame) -> pl.LazyFrame:
return (
raw.select(
_clean_string("address").alias("epc_address"),
_clean_string("postcode").str.to_uppercase().alias("epc_postcode"),
_clean_string("current_energy_rating")
.str.to_uppercase()
.alias("current_energy_rating"),
_clean_string("potential_energy_rating")
.str.to_uppercase()
.alias("potential_energy_rating"),
_clean_string("property_type").alias("epc_property_type"),
_clean_string("built_form").alias("built_form"),
_clean_string("inspection_date").alias("inspection_date"),
_clean_number("total_floor_area", pl.Float64).alias("total_floor_area"),
_clean_number("number_habitable_rooms", pl.Int16).alias(
"number_habitable_rooms"
),
_clean_number("floor_height", pl.Float64).alias("floor_height"),
_clean_string("construction_age_band").alias("construction_age_band"),
_clean_string("tenure").alias("tenure"),
)
.filter(pl.col("epc_address").is_not_null())
.with_columns(
pl.when(pl.col("number_habitable_rooms") == 0)
.then(None)
.otherwise(pl.col("number_habitable_rooms"))
.alias("number_habitable_rooms"),
)
)
def _certificate_member_names(zip_file: zipfile.ZipFile) -> list[str]:
return sorted(
name
for name in zip_file.namelist()
if not name.endswith("/")
and Path(name).name.lower().startswith("certificates")
and name.lower().endswith(".csv")
)
def _read_zip_csv_header(zip_file: zipfile.ZipFile, member_name: str) -> list[str]:
with zip_file.open(member_name) as member:
text = io.TextIOWrapper(member, encoding="utf-8-sig", newline="")
try:
return next(csv.reader(text))
except StopIteration as exc:
raise ValueError(f"EPC CSV member is empty: {member_name}") from exc
def _source_columns_for_header(header: list[str]) -> list[str]:
columns_by_normalised_name = {
normalised: source
for source, normalised in zip(header, _normalise_csv_columns(header))
}
return [
columns_by_normalised_name.get(column, column) for column in EPC_SOURCE_COLUMNS
]
def _zip_certificates_to_parquet(zip_path: Path, output_path: Path) -> None:
schema = pa.schema((column, pa.string()) for column in EPC_SOURCE_COLUMNS)
writer = pq.ParquetWriter(output_path, schema=schema, compression="zstd")
try:
try:
zip_file = zipfile.ZipFile(zip_path)
except zipfile.BadZipFile as exc:
raise ValueError(
f"{zip_path} is not a readable EPC zip archive; re-download "
"domestic-csv.zip and try again"
) from exc
with zip_file:
member_names = _certificate_member_names(zip_file)
if not member_names:
raise ValueError(f"No certificate CSV files found in {zip_path}")
for member_name in member_names:
print(f"Reading EPC certificates from {member_name}")
source_columns = _source_columns_for_header(
_read_zip_csv_header(zip_file, member_name)
)
convert_options = pa_csv.ConvertOptions(
include_columns=source_columns,
include_missing_columns=True,
column_types={
source_column: pa.string() for source_column in source_columns
},
strings_can_be_null=True,
)
read_options = pa_csv.ReadOptions(block_size=64 * 1024 * 1024)
with zip_file.open(member_name) as member:
reader = pa_csv.open_csv(
member,
read_options=read_options,
convert_options=convert_options,
)
while True:
try:
batch = reader.read_next_batch()
except StopIteration:
break
if batch.num_rows == 0:
continue
writer.write_batch(batch.rename_columns(EPC_SOURCE_COLUMNS))
finally:
writer.close()
def _scan_epc_certificates(epc_path: Path, temp_dir: Path) -> pl.LazyFrame:
if epc_path.suffix.lower() == ".zip":
parquet_path = temp_dir / "epc-certificates.parquet"
_zip_certificates_to_parquet(epc_path, parquet_path)
raw = pl.scan_parquet(parquet_path)
else:
raw = pl.scan_csv(
epc_path,
infer_schema=False,
with_column_names=_normalise_csv_columns,
)
return _select_epc_columns(raw)
def main():
parser = argparse.ArgumentParser(description="Fuzzy join EPC and Price Paid data")
parser.add_argument(
"--epc", type=Path, required=True, help="EPC certificates CSV file or zip"
)
parser.add_argument(
"--price-paid", type=Path, required=True, help="Price paid parquet file"
)
parser.add_argument(
"--output", type=Path, required=True, help="Output parquet file path"
)
args = parser.parse_args()
with tempfile.TemporaryDirectory(prefix="epc_certificates_") as tmpdir:
_run(args.epc, args.price_paid, args.output, Path(tmpdir))
def _run(epc_path: Path, price_paid_path: Path, output_path: Path, temp_dir: Path):
epc_base = _scan_epc_certificates(epc_path, temp_dir).with_columns(
normalize_address_key(pl.col("epc_address")).alias("_epc_match_address"),
normalize_postcode_key(pl.col("epc_postcode")).alias("_epc_match_postcode"),
)
# Dedup fork: keep latest certificate per property (existing logic)
epc = (
epc_base.sort("inspection_date", descending=True)
.group_by("_epc_match_address", "_epc_match_postcode")
.first()
.drop("tenure")
)
# Events fork: detect renovation events between consecutive certificates
# Collect eagerly because .over() window functions don't work in streaming
# engine (fuzzy_join.py:50 uses sink_parquet which requires streaming).
events = (
epc_base.sort("inspection_date")
.with_columns(
pl.col("current_energy_rating")
.replace_strict(RATING_RANK, default=None, return_dtype=pl.Int32)
.alias("_rating_rank"),
)
.with_columns(
pl.col("number_habitable_rooms")
.shift(1)
.over("_epc_match_address", "_epc_match_postcode")
.alias("_prev_rooms"),
pl.col("total_floor_area")
.shift(1)
.over("_epc_match_address", "_epc_match_postcode")
.alias("_prev_area"),
pl.col("_rating_rank")
.shift(1)
.over("_epc_match_address", "_epc_match_postcode")
.alias("_prev_rating_rank"),
)
.with_columns(
pl.when(
pl.col("number_habitable_rooms").is_not_null()
& pl.col("_prev_rooms").is_not_null()
& (pl.col("number_habitable_rooms") != pl.col("_prev_rooms"))
)
.then(pl.lit("Remodelling"))
.when(
pl.col("total_floor_area").is_not_null()
& pl.col("_prev_area").is_not_null()
& (pl.col("total_floor_area") > pl.col("_prev_area"))
)
.then(pl.lit("Extension"))
.when(
pl.col("_rating_rank").is_not_null()
& pl.col("_prev_rating_rank").is_not_null()
& (pl.col("_rating_rank") < pl.col("_prev_rating_rank"))
)
.then(pl.lit("Renovation"))
.otherwise(pl.lit(None, dtype=pl.String))
.alias("_event"),
)
.filter(pl.col("_event").is_not_null())
.with_columns(
pl.col("inspection_date")
.cast(pl.String)
.str.slice(0, 4)
.cast(pl.Int32)
.alias("_event_year"),
)
.group_by("_epc_match_address", "_epc_match_postcode")
.agg(
pl.struct(
pl.col("_event_year").alias("year"),
pl.col("_event").alias("event"),
).alias("renovation_history"),
)
.collect()
)
event_counts = (
events["renovation_history"].explode().struct.field("event").value_counts()
)
print(f"Renovation events: {events.height} properties with events")
print(event_counts)
# Social tenure fork: flag properties that were ever social housing
social_tenure = (
epc_base.filter(pl.col("tenure").str.to_lowercase().str.contains("social"))
.select("_epc_match_address", "_epc_match_postcode")
.unique()
.with_columns(pl.lit("Yes").alias("was_council_house"))
.collect()
)
print(f"Former council houses (EPC social tenure): {social_tenure.height}")
# Left-join events and social tenure back onto dedup EPC
epc = (
epc.join(
events.lazy(),
on=["_epc_match_address", "_epc_match_postcode"],
how="left",
)
.join(
social_tenure.lazy(),
on=["_epc_match_address", "_epc_match_postcode"],
how="left",
)
.with_columns(
pl.col("was_council_house").fill_null("No"),
)
)
print("EPC dataset")
print(epc.head().collect())
# https://www.gov.uk/guidance/about-the-price-paid-data
property_type_map = {
"D": "Detached",
"S": "Semi-Detached",
"T": "Terraced",
"F": "Flats/Maisonettes",
"O": "Other",
}
duration_map = {"F": "Freehold", "L": "Leasehold"}
price_paid = (
pl.scan_parquet(price_paid_path)
.select(
"price",
"date_of_transfer",
pl.col("property_type")
.alias("pp_property_type")
.replace(property_type_map),
pl.col("postcode").str.strip_chars(),
"paon",
"saon",
"street",
"locality",
"town_city",
pl.col("duration").replace(duration_map),
"old_new",
)
.filter(pl.col("pp_property_type") != "Other")
.filter(pl.col("price") >= MIN_PRICE)
.with_columns(
pl.concat_str(
[pl.col("saon"), pl.col("paon"), pl.col("street")],
separator=" ",
ignore_nulls=True,
).alias("pp_address"),
)
.with_columns(
normalize_address_key(pl.col("pp_address")).alias("_pp_match_address"),
normalize_postcode_key(pl.col("postcode")).alias("_pp_match_postcode"),
)
.filter(pl.col("_pp_match_postcode").is_not_null())
.with_columns(
pl.coalesce("_pp_match_address", "pp_address").alias("_pp_group_address"),
pl.col("_pp_match_postcode").alias("_pp_group_postcode"),
)
.filter(pl.col("pp_address").is_not_null())
.sort("date_of_transfer")
.group_by("_pp_group_address", "_pp_group_postcode", maintain_order=True)
.agg(
pl.col("pp_address").last(),
pl.col("postcode").last(),
pl.col("_pp_match_address").last(),
pl.col("_pp_match_postcode").last(),
pl.struct(
pl.col("date_of_transfer").dt.year().alias("year"),
pl.col("date_of_transfer").dt.month().cast(pl.UInt8).alias("month"),
"price",
).alias("historical_prices"),
pl.col("pp_property_type").last(),
pl.col("duration").last(),
pl.col("price").last().alias("latest_price"),
pl.col("date_of_transfer").last(),
pl.col("date_of_transfer").first().alias("first_transfer_date"),
pl.col("old_new").first(),
)
)
print("Price paid dataset")
print(price_paid.head().collect())
joined = (
fuzzy_join_on_postcode(
left=price_paid,
right=epc,
left_address_col="pp_address",
right_address_col="epc_address",
left_postcode_col="postcode",
right_postcode_col="epc_postcode",
)
.drop("epc_postcode")
.collect(engine="streaming")
)
matched = joined.filter(
pl.col("epc_address").is_not_null() & pl.col("pp_address").is_not_null()
)
total = joined.height
print(f"Unique properties: {total}")
print(f"Matched: {matched.height} ({100 * matched.height / total:.1f}%)")
print(f"Unmatched: {total - matched.height}")
# For new-builds (old_new == "Y"), use the first transaction date year as
# the exact construction date; otherwise fall back to the EPC age band.
epc_band_year = (
pl.col("construction_age_band")
.str.replace("England and Wales: ", "")
.str.replace(" onwards", "")
.str.extract(r"(\d{4})", 1)
.cast(pl.UInt16, strict=False)
)
transfer_year = (
pl.col("first_transfer_date").dt.year().cast(pl.UInt16, strict=False)
)
is_new_build = pl.col("old_new") == "Y"
joined = joined.with_columns(
pl.when(is_new_build & transfer_year.is_not_null())
.then(transfer_year)
.otherwise(epc_band_year)
.alias("construction_age_band"),
pl.when(is_new_build & transfer_year.is_not_null())
.then(pl.lit(0, dtype=pl.UInt8))
.when(epc_band_year.is_not_null())
.then(pl.lit(1, dtype=pl.UInt8))
.otherwise(pl.lit(None, dtype=pl.UInt8))
.alias("is_construction_date_approximate"),
).drop(
[
"old_new",
"first_transfer_date",
"_pp_match_address",
"_pp_match_postcode",
"_pp_group_address",
"_pp_group_postcode",
"_epc_match_address",
"_epc_match_postcode",
],
strict=False,
)
joined = joined.rename({col: col.lower() for col in joined.columns})
print(joined.head())
joined.write_parquet(output_path)
print(f"Wrote {output_path}")
if __name__ == "__main__":
main()