549 lines
20 KiB
Python
549 lines
20 KiB
Python
import argparse
|
|
import csv
|
|
import io
|
|
import tempfile
|
|
import zipfile
|
|
from pathlib import Path
|
|
|
|
import polars as pl
|
|
import pyarrow as pa
|
|
import pyarrow.csv as pa_csv
|
|
import pyarrow.parquet as pq
|
|
|
|
from pipeline.local_temp import local_tmp_dir
|
|
|
|
from ..utils import (
|
|
fuzzy_join_on_postcode,
|
|
normalize_address_key,
|
|
normalize_postcode_key,
|
|
)
|
|
|
|
pl.Config.set_tbl_cols(-1)
|
|
|
|
RATING_RANK = {"A": 1, "B": 2, "C": 3, "D": 4, "E": 5, "F": 6, "G": 7}
|
|
# Value-quality floor for price aggregations. A flat nominal floor is a blunt
|
|
# tool against a deflating threshold — £50k was completely normal for a 1990s
|
|
# house, so a 50k floor wrongly discarded ~a third of legitimate 1990s
|
|
# open-market sales (and deleted properties whose only sales were old/cheap),
|
|
# biasing early-year price history upward. 10k recovers the large [10k,50k)
|
|
# band of genuine cheaper sales while still excluding the nominal/junk transfers
|
|
# (£1 etc.). A small tail of real sub-10k sales is still dropped — a deliberate
|
|
# conservative tradeoff to keep clearly-implausible transfers out.
|
|
MIN_PRICE = 10_000
|
|
|
|
# Plausible construction-year range; band-derived years outside it (e.g. OCR
|
|
# noise like 1012 or 2202) are nulled rather than published.
|
|
MIN_BUILD_YEAR = 1700
|
|
MAX_BUILD_YEAR = 2030
|
|
|
|
# Plausibility bounds for raw EPC dimensions. EPC lodgements contain data-entry
|
|
# errors (0 m storey heights, 116 m "interior height", 9,210 m² floor areas, 99
|
|
# habitable rooms) that otherwise propagate verbatim into the published per-
|
|
# property columns. Values outside these bands are nulled (treated as unknown)
|
|
# rather than shown. Bounds are deliberately wide so only clear errors are cut.
|
|
MIN_FLOOR_HEIGHT_M = 1.5 # below this a storey is not habitable
|
|
MAX_FLOOR_HEIGHT_M = 6.0 # above this is a data error, not a normal storey
|
|
MAX_TOTAL_FLOOR_AREA_M2 = 2000.0 # ~21,500 sqft; larger is a bulk/garbage record
|
|
MAX_HABITABLE_ROOMS = 20 # dwellings above this are data errors
|
|
|
|
|
|
def epc_band_to_year(band: pl.Expr) -> pl.Expr:
|
|
"""Map an EPC construction age band to a single representative build year.
|
|
|
|
EPC age bands are ranges (e.g. ``1950-1966``); we use the band MIDPOINT
|
|
(1958) rather than the lower bound, which previously biased every band-derived
|
|
year ~10-15 years too young. Open-ended lower bands (``before 1900``) are too
|
|
wide to pin to a year and return null. Single-year / ``... onwards`` bands use
|
|
that year. Already-numeric inputs (a year produced by an earlier call) pass
|
|
through unchanged. Years outside [MIN_BUILD_YEAR, MAX_BUILD_YEAR] are nulled.
|
|
"""
|
|
text = (
|
|
band.cast(pl.Utf8)
|
|
.str.replace("England and Wales: ", "")
|
|
.str.replace(" onwards", "")
|
|
)
|
|
low = text.str.extract(r"(\d{4})", 1).cast(pl.Int32, strict=False)
|
|
high = text.str.extract(r"(\d{4})\D+(\d{4})", 2).cast(pl.Int32, strict=False)
|
|
year = (
|
|
pl.when(text.str.starts_with("before "))
|
|
.then(None)
|
|
.when(high.is_not_null())
|
|
.then(((low + high) / 2).round(0).cast(pl.Int32))
|
|
.otherwise(low)
|
|
)
|
|
return (
|
|
pl.when((year >= MIN_BUILD_YEAR) & (year <= MAX_BUILD_YEAR))
|
|
.then(year)
|
|
.otherwise(None)
|
|
.cast(pl.UInt16, strict=False)
|
|
)
|
|
|
|
|
|
EPC_SOURCE_COLUMNS = [
|
|
"address",
|
|
"postcode",
|
|
"uprn",
|
|
"current_energy_rating",
|
|
"potential_energy_rating",
|
|
"property_type",
|
|
"built_form",
|
|
"inspection_date",
|
|
"total_floor_area",
|
|
"number_habitable_rooms",
|
|
"floor_height",
|
|
"construction_age_band",
|
|
"tenure",
|
|
]
|
|
|
|
|
|
def _normalise_csv_columns(columns: list[str]) -> list[str]:
|
|
return [column.strip().lower() for column in columns]
|
|
|
|
|
|
def _clean_string(column: str) -> pl.Expr:
|
|
stripped = pl.col(column).cast(pl.String).str.strip_chars()
|
|
return pl.when(stripped == "").then(None).otherwise(stripped)
|
|
|
|
|
|
def _clean_number(column: str, dtype: pl.DataType) -> pl.Expr:
|
|
return _clean_string(column).cast(dtype, strict=False)
|
|
|
|
|
|
def _select_epc_columns(raw: pl.LazyFrame) -> pl.LazyFrame:
|
|
return (
|
|
raw.select(
|
|
_clean_string("address").alias("epc_address"),
|
|
_clean_string("postcode").str.to_uppercase().alias("epc_postcode"),
|
|
# UPRN keys an exact listing->EPC join downstream (~99% populated).
|
|
_clean_string("uprn").alias("uprn"),
|
|
_clean_string("current_energy_rating")
|
|
.str.to_uppercase()
|
|
.alias("current_energy_rating"),
|
|
_clean_string("potential_energy_rating")
|
|
.str.to_uppercase()
|
|
.alias("potential_energy_rating"),
|
|
_clean_string("property_type").alias("epc_property_type"),
|
|
_clean_string("built_form").alias("built_form"),
|
|
# Parse to a real Date once (unparseable/blank -> null) so dedup can
|
|
# sort newest-first with nulls_last and _event_year can use dt.year();
|
|
# a lexicographic string sort would let a null/garbled date win under
|
|
# Polars' default nulls-first descending order. EPC inspection dates
|
|
# are ISO (YYYY-MM-DD).
|
|
_clean_string("inspection_date")
|
|
.str.to_date(format="%Y-%m-%d", strict=False)
|
|
.alias("inspection_date"),
|
|
_clean_number("total_floor_area", pl.Float64).alias("total_floor_area"),
|
|
_clean_number("number_habitable_rooms", pl.Int16).alias(
|
|
"number_habitable_rooms"
|
|
),
|
|
_clean_number("floor_height", pl.Float64).alias("floor_height"),
|
|
_clean_string("construction_age_band").alias("construction_age_band"),
|
|
_clean_string("tenure").alias("tenure"),
|
|
)
|
|
.filter(pl.col("epc_address").is_not_null())
|
|
.with_columns(
|
|
# Null implausible EPC dimensions so data-entry errors don't reach
|
|
# the published per-property columns (Interior height, Total floor
|
|
# area, Number of bedrooms & living rooms). Treated as unknown.
|
|
pl.when(
|
|
(pl.col("number_habitable_rooms") >= 1)
|
|
& (pl.col("number_habitable_rooms") <= MAX_HABITABLE_ROOMS)
|
|
)
|
|
.then(pl.col("number_habitable_rooms"))
|
|
.otherwise(None)
|
|
.alias("number_habitable_rooms"),
|
|
pl.when(
|
|
pl.col("floor_height").is_between(
|
|
MIN_FLOOR_HEIGHT_M, MAX_FLOOR_HEIGHT_M
|
|
)
|
|
)
|
|
.then(pl.col("floor_height"))
|
|
.otherwise(None)
|
|
.alias("floor_height"),
|
|
pl.when(pl.col("total_floor_area") <= MAX_TOTAL_FLOOR_AREA_M2)
|
|
.then(pl.col("total_floor_area"))
|
|
.otherwise(None)
|
|
.alias("total_floor_area"),
|
|
)
|
|
)
|
|
|
|
|
|
def _certificate_member_names(zip_file: zipfile.ZipFile) -> list[str]:
|
|
return sorted(
|
|
name
|
|
for name in zip_file.namelist()
|
|
if not name.endswith("/")
|
|
and Path(name).name.lower().startswith("certificates")
|
|
and name.lower().endswith(".csv")
|
|
)
|
|
|
|
|
|
def _read_zip_csv_header(zip_file: zipfile.ZipFile, member_name: str) -> list[str]:
|
|
with zip_file.open(member_name) as member:
|
|
text = io.TextIOWrapper(member, encoding="utf-8-sig", newline="")
|
|
try:
|
|
return next(csv.reader(text))
|
|
except StopIteration as exc:
|
|
raise ValueError(f"EPC CSV member is empty: {member_name}") from exc
|
|
|
|
|
|
def _source_columns_for_header(header: list[str]) -> list[str]:
|
|
columns_by_normalised_name = {
|
|
normalised: source
|
|
for source, normalised in zip(header, _normalise_csv_columns(header))
|
|
}
|
|
return [
|
|
columns_by_normalised_name.get(column, column) for column in EPC_SOURCE_COLUMNS
|
|
]
|
|
|
|
|
|
def _zip_certificates_to_parquet(zip_path: Path, output_path: Path) -> None:
|
|
schema = pa.schema((column, pa.string()) for column in EPC_SOURCE_COLUMNS)
|
|
writer = pq.ParquetWriter(output_path, schema=schema, compression="zstd")
|
|
|
|
try:
|
|
try:
|
|
zip_file = zipfile.ZipFile(zip_path)
|
|
except zipfile.BadZipFile as exc:
|
|
raise ValueError(
|
|
f"{zip_path} is not a readable EPC zip archive; re-download "
|
|
"domestic-csv.zip and try again"
|
|
) from exc
|
|
|
|
with zip_file:
|
|
member_names = _certificate_member_names(zip_file)
|
|
if not member_names:
|
|
raise ValueError(f"No certificate CSV files found in {zip_path}")
|
|
|
|
for member_name in member_names:
|
|
print(f"Reading EPC certificates from {member_name}")
|
|
source_columns = _source_columns_for_header(
|
|
_read_zip_csv_header(zip_file, member_name)
|
|
)
|
|
convert_options = pa_csv.ConvertOptions(
|
|
include_columns=source_columns,
|
|
include_missing_columns=True,
|
|
column_types={
|
|
source_column: pa.string() for source_column in source_columns
|
|
},
|
|
strings_can_be_null=True,
|
|
)
|
|
read_options = pa_csv.ReadOptions(block_size=64 * 1024 * 1024)
|
|
|
|
with zip_file.open(member_name) as member:
|
|
reader = pa_csv.open_csv(
|
|
member,
|
|
read_options=read_options,
|
|
convert_options=convert_options,
|
|
)
|
|
while True:
|
|
try:
|
|
batch = reader.read_next_batch()
|
|
except StopIteration:
|
|
break
|
|
|
|
if batch.num_rows == 0:
|
|
continue
|
|
|
|
writer.write_batch(batch.rename_columns(EPC_SOURCE_COLUMNS))
|
|
finally:
|
|
writer.close()
|
|
|
|
|
|
def _scan_epc_certificates(epc_path: Path, temp_dir: Path) -> pl.LazyFrame:
|
|
if epc_path.suffix.lower() == ".zip":
|
|
parquet_path = temp_dir / "epc-certificates.parquet"
|
|
_zip_certificates_to_parquet(epc_path, parquet_path)
|
|
raw = pl.scan_parquet(parquet_path)
|
|
else:
|
|
raw = pl.scan_csv(
|
|
epc_path,
|
|
infer_schema=False,
|
|
with_column_names=_normalise_csv_columns,
|
|
)
|
|
|
|
return _select_epc_columns(raw)
|
|
|
|
|
|
def main():
|
|
parser = argparse.ArgumentParser(description="Fuzzy join EPC and Price Paid data")
|
|
parser.add_argument(
|
|
"--epc", type=Path, required=True, help="EPC certificates CSV file or zip"
|
|
)
|
|
parser.add_argument(
|
|
"--price-paid", type=Path, required=True, help="Price paid parquet file"
|
|
)
|
|
parser.add_argument(
|
|
"--output", type=Path, required=True, help="Output parquet file path"
|
|
)
|
|
args = parser.parse_args()
|
|
|
|
with tempfile.TemporaryDirectory(
|
|
prefix="epc_certificates_", dir=local_tmp_dir()
|
|
) as tmpdir:
|
|
_run(args.epc, args.price_paid, args.output, Path(tmpdir))
|
|
|
|
|
|
def _run(epc_path: Path, price_paid_path: Path, output_path: Path, temp_dir: Path):
|
|
epc_base = _scan_epc_certificates(epc_path, temp_dir).with_columns(
|
|
normalize_address_key(pl.col("epc_address")).alias("_epc_match_address"),
|
|
normalize_postcode_key(pl.col("epc_postcode")).alias("_epc_match_postcode"),
|
|
)
|
|
|
|
# Dedup fork: keep latest certificate per property. inspection_date is a typed
|
|
# Date (see _select_epc_columns); nulls_last keeps a real-dated cert ahead of a
|
|
# null/unparseable-dated one so the genuinely newest certificate is chosen.
|
|
epc = (
|
|
epc_base.sort("inspection_date", descending=True, nulls_last=True)
|
|
.group_by("_epc_match_address", "_epc_match_postcode")
|
|
.first()
|
|
.drop("tenure")
|
|
)
|
|
|
|
# Events fork: detect renovation events between consecutive certificates
|
|
# Collect eagerly because .over() window functions don't work in streaming
|
|
# engine (fuzzy_join.py:50 uses sink_parquet which requires streaming).
|
|
events = (
|
|
epc_base.sort("inspection_date")
|
|
.with_columns(
|
|
pl.col("current_energy_rating")
|
|
.replace_strict(RATING_RANK, default=None, return_dtype=pl.Int32)
|
|
.alias("_rating_rank"),
|
|
)
|
|
.with_columns(
|
|
pl.col("number_habitable_rooms")
|
|
.shift(1)
|
|
.over("_epc_match_address", "_epc_match_postcode")
|
|
.alias("_prev_rooms"),
|
|
pl.col("total_floor_area")
|
|
.shift(1)
|
|
.over("_epc_match_address", "_epc_match_postcode")
|
|
.alias("_prev_area"),
|
|
pl.col("_rating_rank")
|
|
.shift(1)
|
|
.over("_epc_match_address", "_epc_match_postcode")
|
|
.alias("_prev_rating_rank"),
|
|
)
|
|
.with_columns(
|
|
pl.when(
|
|
pl.col("number_habitable_rooms").is_not_null()
|
|
& pl.col("_prev_rooms").is_not_null()
|
|
& (pl.col("number_habitable_rooms") != pl.col("_prev_rooms"))
|
|
)
|
|
.then(pl.lit("Remodelling"))
|
|
.when(
|
|
pl.col("total_floor_area").is_not_null()
|
|
& pl.col("_prev_area").is_not_null()
|
|
& (pl.col("total_floor_area") > pl.col("_prev_area"))
|
|
)
|
|
.then(pl.lit("Extension"))
|
|
.when(
|
|
pl.col("_rating_rank").is_not_null()
|
|
& pl.col("_prev_rating_rank").is_not_null()
|
|
& (pl.col("_rating_rank") < pl.col("_prev_rating_rank"))
|
|
)
|
|
.then(pl.lit("Renovation"))
|
|
.otherwise(pl.lit(None, dtype=pl.String))
|
|
.alias("_event"),
|
|
)
|
|
.filter(pl.col("_event").is_not_null())
|
|
.with_columns(
|
|
pl.col("inspection_date").dt.year().cast(pl.Int32).alias("_event_year"),
|
|
)
|
|
.group_by("_epc_match_address", "_epc_match_postcode")
|
|
.agg(
|
|
pl.struct(
|
|
pl.col("_event_year").alias("year"),
|
|
pl.col("_event").alias("event"),
|
|
).alias("renovation_history"),
|
|
)
|
|
.collect()
|
|
)
|
|
|
|
event_counts = (
|
|
events["renovation_history"].explode().struct.field("event").value_counts()
|
|
)
|
|
print(f"Renovation events: {events.height} properties with events")
|
|
print(event_counts)
|
|
|
|
# Social tenure fork: flag properties that were ever social housing
|
|
social_tenure = (
|
|
epc_base.filter(pl.col("tenure").str.to_lowercase().str.contains("social"))
|
|
.select("_epc_match_address", "_epc_match_postcode")
|
|
.unique()
|
|
.with_columns(pl.lit("Yes").alias("was_council_house"))
|
|
.collect()
|
|
)
|
|
print(f"Former council houses (EPC social tenure): {social_tenure.height}")
|
|
|
|
# Left-join events and social tenure back onto dedup EPC
|
|
epc = (
|
|
epc.join(
|
|
events.lazy(),
|
|
on=["_epc_match_address", "_epc_match_postcode"],
|
|
how="left",
|
|
)
|
|
.join(
|
|
social_tenure.lazy(),
|
|
on=["_epc_match_address", "_epc_match_postcode"],
|
|
how="left",
|
|
)
|
|
.with_columns(
|
|
pl.col("was_council_house").fill_null("No"),
|
|
)
|
|
)
|
|
|
|
print("EPC dataset")
|
|
print(epc.head().collect())
|
|
|
|
# https://www.gov.uk/guidance/about-the-price-paid-data
|
|
property_type_map = {
|
|
"D": "Detached",
|
|
"S": "Semi-Detached",
|
|
"T": "Terraced",
|
|
"F": "Flats/Maisonettes",
|
|
"O": "Other",
|
|
}
|
|
duration_map = {"F": "Freehold", "L": "Leasehold"}
|
|
|
|
# price >= MIN_PRICE and ppd_category == "A" (standard open-market sale) are
|
|
# VALUE-QUALITY filters: they gate the price aggregations only. Category B
|
|
# entries (repossessions, bulk/portfolio, power-of-sale transfers) and sub-MIN
|
|
# sales must not pollute latest_price / historical_prices (and the downstream
|
|
# price-per-sqm feature), but they MUST still count for first_transfer_date /
|
|
# old_new so a new-build's genuine earliest transfer year is preserved.
|
|
price_ok = pl.col("price") >= MIN_PRICE
|
|
category_ok = pl.col("ppd_category") == "A"
|
|
quality_ok = price_ok & category_ok
|
|
|
|
price_paid = (
|
|
pl.scan_parquet(price_paid_path)
|
|
.select(
|
|
"price",
|
|
"date_of_transfer",
|
|
pl.col("property_type")
|
|
.alias("pp_property_type")
|
|
.replace(property_type_map),
|
|
pl.col("postcode").str.strip_chars(),
|
|
"paon",
|
|
"saon",
|
|
"street",
|
|
"locality",
|
|
"town_city",
|
|
pl.col("duration").replace(duration_map),
|
|
"old_new",
|
|
"ppd_category",
|
|
)
|
|
.filter(pl.col("pp_property_type") != "Other")
|
|
.with_columns(
|
|
pl.concat_str(
|
|
[pl.col("saon"), pl.col("paon"), pl.col("street")],
|
|
separator=" ",
|
|
ignore_nulls=True,
|
|
).alias("pp_address"),
|
|
)
|
|
.with_columns(
|
|
normalize_address_key(pl.col("pp_address")).alias("_pp_match_address"),
|
|
normalize_postcode_key(pl.col("postcode")).alias("_pp_match_postcode"),
|
|
)
|
|
.filter(pl.col("_pp_match_postcode").is_not_null())
|
|
.with_columns(
|
|
pl.coalesce("_pp_match_address", "pp_address").alias("_pp_group_address"),
|
|
pl.col("_pp_match_postcode").alias("_pp_group_postcode"),
|
|
)
|
|
.filter(pl.col("pp_address").is_not_null())
|
|
.sort("date_of_transfer")
|
|
.group_by("_pp_group_address", "_pp_group_postcode", maintain_order=True)
|
|
.agg(
|
|
pl.col("pp_address").last(),
|
|
pl.col("postcode").last(),
|
|
pl.col("_pp_match_address").last(),
|
|
pl.col("_pp_match_postcode").last(),
|
|
# Price aggregations are restricted to quality-passing sales.
|
|
pl.struct(
|
|
pl.col("date_of_transfer").dt.year().alias("year"),
|
|
pl.col("date_of_transfer").dt.month().cast(pl.UInt8).alias("month"),
|
|
"price",
|
|
)
|
|
.filter(quality_ok)
|
|
.alias("historical_prices"),
|
|
pl.col("pp_property_type").last(),
|
|
pl.col("duration").last(),
|
|
pl.col("price").filter(quality_ok).last().alias("latest_price"),
|
|
pl.col("date_of_transfer").filter(quality_ok).last(),
|
|
# first_transfer_date / old_new reflect the genuine earliest transfer
|
|
# over the full per-group transaction stream (not value-filtered).
|
|
pl.col("date_of_transfer").first().alias("first_transfer_date"),
|
|
pl.col("old_new").first(),
|
|
)
|
|
# Preserve the property universe: previously a property needed >=1 sale
|
|
# >=MIN_PRICE to form a group, so drop groups with no quality-passing sale.
|
|
.filter(pl.col("latest_price").is_not_null())
|
|
)
|
|
|
|
print("Price paid dataset")
|
|
print(price_paid.head().collect())
|
|
|
|
joined = (
|
|
fuzzy_join_on_postcode(
|
|
left=price_paid,
|
|
right=epc,
|
|
left_address_col="pp_address",
|
|
right_address_col="epc_address",
|
|
left_postcode_col="postcode",
|
|
right_postcode_col="epc_postcode",
|
|
)
|
|
.drop("epc_postcode")
|
|
.collect(engine="streaming")
|
|
)
|
|
|
|
matched = joined.filter(
|
|
pl.col("epc_address").is_not_null() & pl.col("pp_address").is_not_null()
|
|
)
|
|
total = joined.height
|
|
print(f"Unique properties: {total}")
|
|
print(f"Matched: {matched.height} ({100 * matched.height / total:.1f}%)")
|
|
print(f"Unmatched: {total - matched.height}")
|
|
|
|
# For new-builds (old_new == "Y"), use the first transaction date year as
|
|
# the exact construction date; otherwise fall back to the EPC age band.
|
|
epc_band_year = epc_band_to_year(pl.col("construction_age_band"))
|
|
transfer_year = (
|
|
pl.col("first_transfer_date").dt.year().cast(pl.UInt16, strict=False)
|
|
)
|
|
is_new_build = pl.col("old_new") == "Y"
|
|
|
|
joined = joined.with_columns(
|
|
pl.when(is_new_build & transfer_year.is_not_null())
|
|
.then(transfer_year)
|
|
.otherwise(epc_band_year)
|
|
.alias("construction_age_band"),
|
|
pl.when(is_new_build & transfer_year.is_not_null())
|
|
.then(pl.lit(0, dtype=pl.UInt8))
|
|
.when(epc_band_year.is_not_null())
|
|
.then(pl.lit(1, dtype=pl.UInt8))
|
|
.otherwise(pl.lit(None, dtype=pl.UInt8))
|
|
.alias("is_construction_date_approximate"),
|
|
).drop(
|
|
[
|
|
"old_new",
|
|
"first_transfer_date",
|
|
"_pp_match_address",
|
|
"_pp_match_postcode",
|
|
"_pp_group_address",
|
|
"_pp_group_postcode",
|
|
"_epc_match_address",
|
|
"_epc_match_postcode",
|
|
],
|
|
strict=False,
|
|
)
|
|
|
|
joined = joined.rename({col: col.lower() for col in joined.columns})
|
|
|
|
print(joined.head())
|
|
joined.write_parquet(output_path)
|
|
print(f"Wrote {output_path}")
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|