Update data

This commit is contained in:
Andras Schmelczer 2026-05-14 08:17:10 +01:00
parent a4103b0896
commit 273d7a83ee
15 changed files with 716 additions and 316 deletions

View file

@ -10,7 +10,11 @@ import pyarrow as pa
import pyarrow.csv as pa_csv
import pyarrow.parquet as pq
from ..utils import fuzzy_join_on_postcode
from ..utils import (
fuzzy_join_on_postcode,
normalize_address_key,
normalize_postcode_key,
)
pl.Config.set_tbl_cols(-1)
@ -193,12 +197,15 @@ def main():
def _run(epc_path: Path, price_paid_path: Path, output_path: Path, temp_dir: Path):
epc_base = _scan_epc_certificates(epc_path, temp_dir)
epc_base = _scan_epc_certificates(epc_path, temp_dir).with_columns(
normalize_address_key(pl.col("epc_address")).alias("_epc_match_address"),
normalize_postcode_key(pl.col("epc_postcode")).alias("_epc_match_postcode"),
)
# Dedup fork: keep latest certificate per property (existing logic)
epc = (
epc_base.sort("inspection_date", descending=True)
.group_by("epc_address", "epc_postcode")
.group_by("_epc_match_address", "_epc_match_postcode")
.first()
.drop("tenure")
)
@ -216,15 +223,15 @@ def _run(epc_path: Path, price_paid_path: Path, output_path: Path, temp_dir: Pat
.with_columns(
pl.col("number_habitable_rooms")
.shift(1)
.over("epc_address", "epc_postcode")
.over("_epc_match_address", "_epc_match_postcode")
.alias("_prev_rooms"),
pl.col("total_floor_area")
.shift(1)
.over("epc_address", "epc_postcode")
.over("_epc_match_address", "_epc_match_postcode")
.alias("_prev_area"),
pl.col("_rating_rank")
.shift(1)
.over("epc_address", "epc_postcode")
.over("_epc_match_address", "_epc_match_postcode")
.alias("_prev_rating_rank"),
)
.with_columns(
@ -257,7 +264,7 @@ def _run(epc_path: Path, price_paid_path: Path, output_path: Path, temp_dir: Pat
.cast(pl.Int32)
.alias("_event_year"),
)
.group_by("epc_address", "epc_postcode")
.group_by("_epc_match_address", "_epc_match_postcode")
.agg(
pl.struct(
pl.col("_event_year").alias("year"),
@ -276,7 +283,7 @@ def _run(epc_path: Path, price_paid_path: Path, output_path: Path, temp_dir: Pat
# Social tenure fork: flag properties that were ever social housing
social_tenure = (
epc_base.filter(pl.col("tenure").str.to_lowercase().str.contains("social"))
.select("epc_address", "epc_postcode")
.select("_epc_match_address", "_epc_match_postcode")
.unique()
.with_columns(pl.lit("Yes").alias("was_council_house"))
.collect()
@ -287,12 +294,12 @@ def _run(epc_path: Path, price_paid_path: Path, output_path: Path, temp_dir: Pat
epc = (
epc.join(
events.lazy(),
on=["epc_address", "epc_postcode"],
on=["_epc_match_address", "_epc_match_postcode"],
how="left",
)
.join(
social_tenure.lazy(),
on=["epc_address", "epc_postcode"],
on=["_epc_match_address", "_epc_match_postcode"],
how="left",
)
.with_columns(
@ -339,9 +346,23 @@ def _run(epc_path: Path, price_paid_path: Path, output_path: Path, temp_dir: Pat
ignore_nulls=True,
).alias("pp_address"),
)
.with_columns(
normalize_address_key(pl.col("pp_address")).alias("_pp_match_address"),
normalize_postcode_key(pl.col("postcode")).alias("_pp_match_postcode"),
)
.filter(pl.col("_pp_match_postcode").is_not_null())
.with_columns(
pl.coalesce("_pp_match_address", "pp_address").alias("_pp_group_address"),
pl.col("_pp_match_postcode").alias("_pp_group_postcode"),
)
.filter(pl.col("pp_address").is_not_null())
.sort("date_of_transfer")
.group_by("pp_address", "postcode", maintain_order=True)
.group_by("_pp_group_address", "_pp_group_postcode", maintain_order=True)
.agg(
pl.col("pp_address").last(),
pl.col("postcode").last(),
pl.col("_pp_match_address").last(),
pl.col("_pp_match_postcode").last(),
pl.struct(
pl.col("date_of_transfer").dt.year().alias("year"),
pl.col("date_of_transfer").dt.month().cast(pl.UInt8).alias("month"),
@ -354,7 +375,7 @@ def _run(epc_path: Path, price_paid_path: Path, output_path: Path, temp_dir: Pat
pl.col("date_of_transfer").first().alias("first_transfer_date"),
pl.col("old_new").first(),
)
).filter(pl.col("pp_address").is_not_null())
)
print("Price paid dataset")
print(price_paid.head().collect())
@ -405,7 +426,19 @@ def _run(epc_path: Path, price_paid_path: Path, output_path: Path, temp_dir: Pat
.then(pl.lit(1, dtype=pl.UInt8))
.otherwise(pl.lit(None, dtype=pl.UInt8))
.alias("is_construction_date_approximate"),
).drop("old_new", "first_transfer_date")
).drop(
[
"old_new",
"first_transfer_date",
"_pp_match_address",
"_pp_match_postcode",
"_pp_group_address",
"_pp_group_postcode",
"_epc_match_address",
"_epc_match_postcode",
],
strict=False,
)
joined = joined.rename({col: col.lower() for col in joined.columns})