"""Augment wide.parquet with an estimated current price column. Joins the precomputed repeat-sales price index (from price_index.py) with each property's last known sale to produce an inflation-adjusted current price estimate. Uses type-stratified index when available, falling back to "All" type. Modifies wide.parquet in-place, adding the "Estimated current price" column. """ import argparse from pathlib import Path import polars as pl CURRENT_YEAR = 2025 TERRACE_TYPES = ["Mid-Terrace", "End-Terrace", "Enclosed Mid-Terrace", "Enclosed End-Terrace"] def type_group_expr(): return ( pl.when(pl.col("Property type").is_in(TERRACE_TYPES)).then(pl.lit("Terraced")) .when(pl.col("Property type") == "Flats/Maisonettes").then(pl.lit("Flats")) .when(pl.col("Property type").is_in(["Detached", "Semi-Detached"])).then(pl.col("Property type")) .otherwise(pl.lit(None)) .alias("type_group") ) def main(): parser = argparse.ArgumentParser(description="Augment wide.parquet with estimated current prices") parser.add_argument("--input", type=Path, required=True, help="Path to wide.parquet (modified in-place)") parser.add_argument("--index", type=Path, required=True, help="Path to price_index.parquet") args = parser.parse_args() print("Loading wide.parquet...") df = pl.read_parquet(args.input) print(f" {len(df):,} rows, {len(df.columns)} columns") # Drop existing estimated price column if re-running if "Estimated current price" in df.columns: df = df.drop("Estimated current price") # Derive helper columns for the join has_price = ( pl.col("Last known price").is_not_null() & pl.col("Postcode").is_not_null() & pl.col("Date of last transaction").is_not_null() ) df = df.with_columns( pl.col("Postcode").str.slice(0, pl.col("Postcode").str.len_chars() - 2).str.strip_chars().alias("_sector"), pl.col("Date of last transaction").dt.year().alias("_sale_year"), type_group_expr().alias("_type_group"), ) index = pl.read_parquet(args.index) has_type_group = "type_group" in index.columns if has_type_group: print(f" Price index: {len(index):,} rows, {index['sector'].n_unique():,} sectors, " f"{index['type_group'].n_unique()} type groups") else: print(f" Price index: {len(index):,} rows, {index['sector'].n_unique():,} sectors (unstratified)") print("\nApplying repeat-sales index...") if has_type_group: idx_typed = index.filter(pl.col("type_group") != "All") idx_all = index.filter(pl.col("type_group") == "All") # Join type-specific index at sale year df = df.join( idx_typed.select("sector", "type_group", "year", pl.col("log_index").alias("log_idx_sale_typed")), left_on=["_sector", "_type_group", "_sale_year"], right_on=["sector", "type_group", "year"], how="left", ) # Join "All" index at sale year df = df.join( idx_all.select("sector", "year", pl.col("log_index").alias("log_idx_sale_all")), left_on=["_sector", "_sale_year"], right_on=["sector", "year"], how="left", ) # Join type-specific index at current year df = df.join( idx_typed.filter(pl.col("year") == CURRENT_YEAR) .select("sector", "type_group", pl.col("log_index").alias("log_idx_cur_typed")), left_on=["_sector", "_type_group"], right_on=["sector", "type_group"], how="left", ) # Join "All" index at current year df = df.join( idx_all.filter(pl.col("year") == CURRENT_YEAR) .select("sector", pl.col("log_index").alias("log_idx_cur_all")), left_on="_sector", right_on="sector", how="left", ) df = df.with_columns( pl.col("log_idx_sale_typed").fill_null(pl.col("log_idx_sale_all")).alias("_log_index_sale"), pl.col("log_idx_cur_typed").fill_null(pl.col("log_idx_cur_all")).alias("_log_index_current"), ) else: df = df.join( index.select("sector", "year", pl.col("log_index").alias("_log_index_sale")), left_on=["_sector", "_sale_year"], right_on=["sector", "year"], how="left", ) index_current = ( index.filter(pl.col("year") == CURRENT_YEAR) .select("sector", pl.col("log_index").alias("_log_index_current")) ) df = df.join(index_current, left_on="_sector", right_on="sector", how="left") # Compute estimate — only for rows with a known price df = df.with_columns( pl.when(has_price) .then( pl.col("Last known price").cast(pl.Float64) * (pl.col("_log_index_current") - pl.col("_log_index_sale")).exp() ) .otherwise(pl.lit(None)) .alias("Estimated current price"), ) n_adjusted = df.filter( has_price & pl.col("_log_index_sale").is_not_null() ).height n_with_price = df.filter(has_price).height print(f" {n_adjusted:,} of {n_with_price:,} properties adjusted by index ({n_adjusted / max(n_with_price, 1) * 100:.1f}%)") # Drop all temporary columns temp_cols = [c for c in df.columns if c.startswith("_") or c.startswith("log_idx_")] df = df.drop(temp_cols) df.write_parquet(args.input) size_mb = args.input.stat().st_size / (1024 * 1024) print(f"\nWrote {args.input} ({size_mb:.1f} MB)") print(f" {len(df):,} rows, {len(df.columns)} columns (including 'Estimated current price')") if __name__ == "__main__": main()