Rerun data pipelines

This commit is contained in:
Andras Schmelczer 2026-05-10 14:49:53 +01:00
parent 4c95815dc8
commit fc10381692
27 changed files with 2143 additions and 215 deletions

View file

@ -1,6 +1,15 @@
import argparse
import polars as pl
import csv
import io
import tempfile
import zipfile
from pathlib import Path
import polars as pl
import pyarrow as pa
import pyarrow.csv as pa_csv
import pyarrow.parquet as pq
from ..utils import fuzzy_join_on_postcode
@ -8,12 +17,168 @@ pl.Config.set_tbl_cols(-1)
RATING_RANK = {"A": 1, "B": 2, "C": 3, "D": 4, "E": 5, "F": 6, "G": 7}
MIN_PRICE = 50_000
EPC_SOURCE_COLUMNS = [
"address",
"postcode",
"current_energy_rating",
"potential_energy_rating",
"property_type",
"built_form",
"inspection_date",
"total_floor_area",
"number_habitable_rooms",
"floor_height",
"construction_age_band",
"tenure",
]
def _normalise_csv_columns(columns: list[str]) -> list[str]:
return [column.strip().lower() for column in columns]
def _clean_string(column: str) -> pl.Expr:
stripped = pl.col(column).cast(pl.String).str.strip_chars()
return pl.when(stripped == "").then(None).otherwise(stripped)
def _clean_number(column: str, dtype: pl.DataType) -> pl.Expr:
return _clean_string(column).cast(dtype, strict=False)
def _select_epc_columns(raw: pl.LazyFrame) -> pl.LazyFrame:
return (
raw.select(
_clean_string("address").alias("epc_address"),
_clean_string("postcode").str.to_uppercase().alias("epc_postcode"),
_clean_string("current_energy_rating")
.str.to_uppercase()
.alias("current_energy_rating"),
_clean_string("potential_energy_rating")
.str.to_uppercase()
.alias("potential_energy_rating"),
_clean_string("property_type").alias("epc_property_type"),
_clean_string("built_form").alias("built_form"),
_clean_string("inspection_date").alias("inspection_date"),
_clean_number("total_floor_area", pl.Float64).alias("total_floor_area"),
_clean_number("number_habitable_rooms", pl.Int16).alias(
"number_habitable_rooms"
),
_clean_number("floor_height", pl.Float64).alias("floor_height"),
_clean_string("construction_age_band").alias("construction_age_band"),
_clean_string("tenure").alias("tenure"),
)
.filter(pl.col("epc_address").is_not_null())
.with_columns(
pl.when(pl.col("number_habitable_rooms") == 0)
.then(None)
.otherwise(pl.col("number_habitable_rooms"))
.alias("number_habitable_rooms"),
)
)
def _certificate_member_names(zip_file: zipfile.ZipFile) -> list[str]:
return sorted(
name
for name in zip_file.namelist()
if not name.endswith("/")
and Path(name).name.lower().startswith("certificates")
and name.lower().endswith(".csv")
)
def _read_zip_csv_header(zip_file: zipfile.ZipFile, member_name: str) -> list[str]:
with zip_file.open(member_name) as member:
text = io.TextIOWrapper(member, encoding="utf-8-sig", newline="")
try:
return next(csv.reader(text))
except StopIteration as exc:
raise ValueError(f"EPC CSV member is empty: {member_name}") from exc
def _source_columns_for_header(header: list[str]) -> list[str]:
columns_by_normalised_name = {
normalised: source
for source, normalised in zip(header, _normalise_csv_columns(header))
}
return [
columns_by_normalised_name.get(column, column) for column in EPC_SOURCE_COLUMNS
]
def _zip_certificates_to_parquet(zip_path: Path, output_path: Path) -> None:
schema = pa.schema((column, pa.string()) for column in EPC_SOURCE_COLUMNS)
writer = pq.ParquetWriter(output_path, schema=schema, compression="zstd")
try:
try:
zip_file = zipfile.ZipFile(zip_path)
except zipfile.BadZipFile as exc:
raise ValueError(
f"{zip_path} is not a readable EPC zip archive; re-download "
"domestic-csv.zip and try again"
) from exc
with zip_file:
member_names = _certificate_member_names(zip_file)
if not member_names:
raise ValueError(f"No certificate CSV files found in {zip_path}")
for member_name in member_names:
print(f"Reading EPC certificates from {member_name}")
source_columns = _source_columns_for_header(
_read_zip_csv_header(zip_file, member_name)
)
convert_options = pa_csv.ConvertOptions(
include_columns=source_columns,
include_missing_columns=True,
column_types={
source_column: pa.string() for source_column in source_columns
},
strings_can_be_null=True,
)
read_options = pa_csv.ReadOptions(block_size=64 * 1024 * 1024)
with zip_file.open(member_name) as member:
reader = pa_csv.open_csv(
member,
read_options=read_options,
convert_options=convert_options,
)
while True:
try:
batch = reader.read_next_batch()
except StopIteration:
break
if batch.num_rows == 0:
continue
writer.write_batch(batch.rename_columns(EPC_SOURCE_COLUMNS))
finally:
writer.close()
def _scan_epc_certificates(epc_path: Path, temp_dir: Path) -> pl.LazyFrame:
if epc_path.suffix.lower() == ".zip":
parquet_path = temp_dir / "epc-certificates.parquet"
_zip_certificates_to_parquet(epc_path, parquet_path)
raw = pl.scan_parquet(parquet_path)
else:
raw = pl.scan_csv(
epc_path,
infer_schema=False,
with_column_names=_normalise_csv_columns,
)
return _select_epc_columns(raw)
def main():
parser = argparse.ArgumentParser(description="Fuzzy join EPC and Price Paid data")
parser.add_argument(
"--epc", type=Path, required=True, help="EPC certificates CSV file"
"--epc", type=Path, required=True, help="EPC certificates CSV file or zip"
)
parser.add_argument(
"--price-paid", type=Path, required=True, help="Price paid parquet file"
@ -23,74 +188,56 @@ def main():
)
args = parser.parse_args()
epc_base = (
pl.scan_csv(args.epc)
.select(
pl.col("ADDRESS").alias("epc_address"),
"POSTCODE",
"CURRENT_ENERGY_RATING",
"POTENTIAL_ENERGY_RATING",
pl.col("PROPERTY_TYPE").alias("epc_property_type"),
"BUILT_FORM",
"INSPECTION_DATE",
"TOTAL_FLOOR_AREA",
"NUMBER_HABITABLE_ROOMS",
"FLOOR_HEIGHT",
"CONSTRUCTION_AGE_BAND",
"TENURE",
)
.filter(pl.col("epc_address").is_not_null())
.with_columns(
pl.when(pl.col("NUMBER_HABITABLE_ROOMS") == 0)
.then(None)
.otherwise(pl.col("NUMBER_HABITABLE_ROOMS"))
.alias("NUMBER_HABITABLE_ROOMS"),
)
)
with tempfile.TemporaryDirectory(prefix="epc_certificates_") as tmpdir:
_run(args.epc, args.price_paid, args.output, Path(tmpdir))
def _run(epc_path: Path, price_paid_path: Path, output_path: Path, temp_dir: Path):
epc_base = _scan_epc_certificates(epc_path, temp_dir)
# Dedup fork: keep latest certificate per property (existing logic)
epc = (
epc_base.sort("INSPECTION_DATE", descending=True)
.group_by("epc_address", "POSTCODE")
epc_base.sort("inspection_date", descending=True)
.group_by("epc_address", "epc_postcode")
.first()
.drop("TENURE")
.drop("tenure")
)
# Events fork: detect renovation events between consecutive certificates
# Collect eagerly because .over() window functions don't work in streaming
# engine (fuzzy_join.py:50 uses sink_parquet which requires streaming).
events = (
epc_base.sort("INSPECTION_DATE")
epc_base.sort("inspection_date")
.with_columns(
pl.col("CURRENT_ENERGY_RATING")
pl.col("current_energy_rating")
.replace_strict(RATING_RANK, default=None, return_dtype=pl.Int32)
.alias("_rating_rank"),
)
.with_columns(
pl.col("NUMBER_HABITABLE_ROOMS")
pl.col("number_habitable_rooms")
.shift(1)
.over("epc_address", "POSTCODE")
.over("epc_address", "epc_postcode")
.alias("_prev_rooms"),
pl.col("TOTAL_FLOOR_AREA")
pl.col("total_floor_area")
.shift(1)
.over("epc_address", "POSTCODE")
.over("epc_address", "epc_postcode")
.alias("_prev_area"),
pl.col("_rating_rank")
.shift(1)
.over("epc_address", "POSTCODE")
.over("epc_address", "epc_postcode")
.alias("_prev_rating_rank"),
)
.with_columns(
pl.when(
pl.col("NUMBER_HABITABLE_ROOMS").is_not_null()
pl.col("number_habitable_rooms").is_not_null()
& pl.col("_prev_rooms").is_not_null()
& (pl.col("NUMBER_HABITABLE_ROOMS") != pl.col("_prev_rooms"))
& (pl.col("number_habitable_rooms") != pl.col("_prev_rooms"))
)
.then(pl.lit("Remodelling"))
.when(
pl.col("TOTAL_FLOOR_AREA").is_not_null()
pl.col("total_floor_area").is_not_null()
& pl.col("_prev_area").is_not_null()
& (pl.col("TOTAL_FLOOR_AREA") > pl.col("_prev_area"))
& (pl.col("total_floor_area") > pl.col("_prev_area"))
)
.then(pl.lit("Extension"))
.when(
@ -104,13 +251,13 @@ def main():
)
.filter(pl.col("_event").is_not_null())
.with_columns(
pl.col("INSPECTION_DATE")
pl.col("inspection_date")
.cast(pl.String)
.str.slice(0, 4)
.cast(pl.Int32)
.alias("_event_year"),
)
.group_by("epc_address", "POSTCODE")
.group_by("epc_address", "epc_postcode")
.agg(
pl.struct(
pl.col("_event_year").alias("year"),
@ -128,8 +275,8 @@ def main():
# Social tenure fork: flag properties that were ever social housing
social_tenure = (
epc_base.filter(pl.col("TENURE").str.to_lowercase().str.contains("social"))
.select("epc_address", "POSTCODE")
epc_base.filter(pl.col("tenure").str.to_lowercase().str.contains("social"))
.select("epc_address", "epc_postcode")
.unique()
.with_columns(pl.lit("Yes").alias("was_council_house"))
.collect()
@ -140,12 +287,12 @@ def main():
epc = (
epc.join(
events.lazy(),
on=["epc_address", "POSTCODE"],
on=["epc_address", "epc_postcode"],
how="left",
)
.join(
social_tenure.lazy(),
on=["epc_address", "POSTCODE"],
on=["epc_address", "epc_postcode"],
how="left",
)
.with_columns(
@ -167,7 +314,7 @@ def main():
duration_map = {"F": "Freehold", "L": "Leasehold"}
price_paid = (
pl.scan_parquet(args.price_paid)
pl.scan_parquet(price_paid_path)
.select(
"price",
"date_of_transfer",
@ -219,9 +366,9 @@ def main():
left_address_col="pp_address",
right_address_col="epc_address",
left_postcode_col="postcode",
right_postcode_col="POSTCODE",
right_postcode_col="epc_postcode",
)
.drop("POSTCODE")
.drop("epc_postcode")
.collect(engine="streaming")
)
@ -236,7 +383,7 @@ def main():
# For new-builds (old_new == "Y"), use the first transaction date year as
# the exact construction date; otherwise fall back to the EPC age band.
epc_band_year = (
pl.col("CONSTRUCTION_AGE_BAND")
pl.col("construction_age_band")
.str.replace("England and Wales: ", "")
.str.replace(" onwards", "")
.str.extract(r"(\d{4})", 1)
@ -251,7 +398,7 @@ def main():
pl.when(is_new_build & transfer_year.is_not_null())
.then(transfer_year)
.otherwise(epc_band_year)
.alias("CONSTRUCTION_AGE_BAND"),
.alias("construction_age_band"),
pl.when(is_new_build & transfer_year.is_not_null())
.then(pl.lit(0, dtype=pl.UInt8))
.when(epc_band_year.is_not_null())
@ -263,8 +410,8 @@ def main():
joined = joined.rename({col: col.lower() for col in joined.columns})
print(joined.head())
joined.write_parquet(args.output)
print(f"Wrote {args.output}")
joined.write_parquet(output_path)
print(f"Wrote {output_path}")
if __name__ == "__main__":