Can't even keep track anymore

This commit is contained in:
Andras Schmelczer 2026-02-13 09:16:28 +00:00
parent dccc1e439d
commit 3a3f899ea2
50 changed files with 1144 additions and 560 deletions

View file

@ -6,6 +6,8 @@ from ..utils import fuzzy_join_on_postcode
pl.Config.set_tbl_cols(-1)
RATING_RANK = {"A": 1, "B": 2, "C": 3, "D": 4, "E": 5, "F": 6, "G": 7}
def main():
parser = argparse.ArgumentParser(description="Fuzzy join EPC and Price Paid data")
@ -20,7 +22,7 @@ def main():
)
args = parser.parse_args()
epc = (
epc_base = (
pl.scan_csv(args.epc)
.select(
pl.col("ADDRESS").alias("epc_address"),
@ -42,11 +44,90 @@ def main():
.otherwise(pl.col("NUMBER_HABITABLE_ROOMS"))
.alias("NUMBER_HABITABLE_ROOMS"),
)
.sort("INSPECTION_DATE", descending=True)
)
# Dedup fork: keep latest certificate per property (existing logic)
epc = (
epc_base.sort("INSPECTION_DATE", descending=True)
.group_by("epc_address", "POSTCODE")
.first()
)
# Events fork: detect renovation events between consecutive certificates
# Collect eagerly because .over() window functions don't work in streaming
# engine (fuzzy_join.py:50 uses sink_parquet which requires streaming).
events = (
epc_base.sort("INSPECTION_DATE")
.with_columns(
pl.col("CURRENT_ENERGY_RATING")
.replace_strict(RATING_RANK, default=None, return_dtype=pl.Int32)
.alias("_rating_rank"),
)
.with_columns(
pl.col("NUMBER_HABITABLE_ROOMS")
.shift(1)
.over("epc_address", "POSTCODE")
.alias("_prev_rooms"),
pl.col("TOTAL_FLOOR_AREA")
.shift(1)
.over("epc_address", "POSTCODE")
.alias("_prev_area"),
pl.col("_rating_rank")
.shift(1)
.over("epc_address", "POSTCODE")
.alias("_prev_rating_rank"),
)
.with_columns(
pl.when(
pl.col("NUMBER_HABITABLE_ROOMS").is_not_null()
& pl.col("_prev_rooms").is_not_null()
& (pl.col("NUMBER_HABITABLE_ROOMS") != pl.col("_prev_rooms"))
)
.then(pl.lit("Remodeling"))
.when(
pl.col("TOTAL_FLOOR_AREA").is_not_null()
& pl.col("_prev_area").is_not_null()
& (pl.col("TOTAL_FLOOR_AREA") > pl.col("_prev_area"))
)
.then(pl.lit("Extension"))
.when(
pl.col("_rating_rank").is_not_null()
& pl.col("_prev_rating_rank").is_not_null()
& (pl.col("_rating_rank") < pl.col("_prev_rating_rank"))
)
.then(pl.lit("Renovation"))
.otherwise(pl.lit(None, dtype=pl.String))
.alias("_event"),
)
.filter(pl.col("_event").is_not_null())
.with_columns(
pl.col("INSPECTION_DATE")
.cast(pl.String)
.str.slice(0, 4)
.cast(pl.Int32)
.alias("_event_year"),
)
.group_by("epc_address", "POSTCODE")
.agg(
pl.struct(
pl.col("_event_year").alias("year"),
pl.col("_event").alias("event"),
).alias("renovation_history"),
)
.collect()
)
event_counts = events["renovation_history"].explode().struct.field("event").value_counts()
print(f"Renovation events: {events.height} properties with events")
print(event_counts)
# Left-join events back onto dedup EPC
epc = epc.join(
events.lazy(),
on=["epc_address", "POSTCODE"],
how="left",
)
print("EPC dataset")
print(epc.head().collect())

View file

@ -42,6 +42,7 @@ def _build_wide(
school_proximity_path: Path,
broadband_path: Path,
geosure_path: Path,
rental_prices_path: Path,
) -> pl.DataFrame:
"""Build the wide dataframe by joining epc_pp with all auxiliary data."""
wide = (
@ -94,6 +95,21 @@ def _build_wide(
how="left",
)
# Derive bedroom count: habitable rooms - 1 (assuming 1 reception room), clipped to 0..4
wide = wide.with_columns(
(pl.col("number_habitable_rooms") - 1)
.clip(0, 4)
.cast(pl.UInt8)
.alias("_bedrooms"),
)
rental = pl.scan_parquet(rental_prices_path)
wide = wide.join(
rental,
left_on=["Local Authority District code (2024)", "_bedrooms"],
right_on=["area_code", "bedrooms"],
how="left",
)
crime = pl.scan_parquet(crime_path)
wide = wide.join(crime, left_on="lsoa21", right_on="LSOA code", how="left")
@ -208,6 +224,7 @@ def _build_wide(
.drop(
"inspection_date",
"floor_height",
"_bedrooms",
"LSOA name (2021)",
"Local Authority District code (2024)",
"Local Authority District name (2024)",
@ -258,6 +275,7 @@ def _build_wide(
"running_sand_risk": "Running sand risk",
"shrink_swell_risk": "Shrink-swell risk",
"soluble_rocks_risk": "Soluble rocks risk",
"median_monthly_rent": "Estimated monthly rent",
}
)
)
@ -332,6 +350,12 @@ def main():
required=True,
help="GeoSure ground stability parquet file",
)
parser.add_argument(
"--rental-prices",
type=Path,
required=True,
help="ONS rental prices by LA and bedroom count parquet file",
)
parser.add_argument(
"--output", type=Path, required=True, help="Output parquet file path"
)
@ -350,6 +374,7 @@ def main():
school_proximity_path=args.school_proximity,
broadband_path=args.broadband,
geosure_path=args.geosure,
rental_prices_path=args.rental_prices,
)
print(f"Columns: {wide.columns}")