"""Augment properties.parquet with estimated current prices. For properties with a known prior sale, applies the repeat-sales price index to adjust the last known price to the current date, then blends with kNN estimates from nearby recently-sold properties. Includes: - Capping extreme index adjustments - kNN spatial blending Modifies properties.parquet in-place. Temporarily joins postcode.parquet for lat/lon needed by kNN, then drops those columns before writing. """ import argparse from pathlib import Path import polars as pl from pipeline.transform.price_estimation.knn import ( KNN_BLEND_WEIGHT, build_knn_pool, knn_median_psm, ) from pipeline.transform.price_estimation.utils import ( CURRENT_FRAC_YEAR, MAX_LOG_ADJUSTMENT, interpolate_log_index, sector_expr, type_group_expr, ) def main(): parser = argparse.ArgumentParser( description="Augment properties.parquet with estimated current prices" ) parser.add_argument( "--properties", type=Path, required=True, help="Path to properties.parquet (modified in-place)", ) parser.add_argument( "--postcodes", type=Path, required=True, help="Path to postcode.parquet (for lat/lon needed by kNN)", ) parser.add_argument( "--index", type=Path, required=True, help="Path to price_index.parquet" ) args = parser.parse_args() print("Loading properties.parquet...") df = pl.read_parquet(args.properties) print(f" {len(df):,} rows, {len(df.columns)} columns") # Join lat/lon from postcode.parquet for kNN spatial queries postcodes = pl.read_parquet(args.postcodes).select("Postcode", "lat", "lon") df = df.join(postcodes, on="Postcode", how="left") print(f" Joined lat/lon from {len(postcodes):,} postcodes") # Drop existing estimated columns if re-running for col in ["Estimated current price", "Est. price per sqm"]: if col in df.columns: df = df.drop(col) # Derive helper columns df = df.with_columns( sector_expr().alias("_sector"), ( pl.col("Date of last transaction").dt.year().cast(pl.Float64) + (pl.col("Date of last transaction").dt.month().cast(pl.Float64) - 1.0) / 12.0 ).alias("_sale_frac_year"), type_group_expr().alias("_type_group"), pl.lit(CURRENT_FRAC_YEAR).alias("_current_frac_year"), ) index = pl.read_parquet(args.index) print( f" Price index: {len(index):,} rows, {index['sector'].n_unique():,} sectors, " f"{index['type_group'].n_unique()} type groups" ) print("\nApplying repeat-sales index with fractional year interpolation...") df = interpolate_log_index( index, df, "_sector", "_type_group", "_sale_frac_year", "_log_index_sale_interp" ) df = interpolate_log_index( index, df, "_sector", "_type_group", "_current_frac_year", "_log_index_current_interp", ) # Compute index-adjusted estimate with cap has_price = ( pl.col("Last known price").is_not_null() & pl.col("Postcode").is_not_null() & pl.col("Date of last transaction").is_not_null() ) df = df.with_columns( pl.when(has_price) .then( pl.col("Last known price").cast(pl.Float64) * (pl.col("_log_index_current_interp") - pl.col("_log_index_sale_interp")) .clip(-MAX_LOG_ADJUSTMENT, MAX_LOG_ADJUSTMENT) .exp() ) .otherwise(pl.lit(None)) .alias("Estimated current price"), ) n_estimated = df.filter(pl.col("Estimated current price").is_not_null()).height n_with_price = df.filter(has_price).height print( f" {n_estimated:,} of {n_with_price:,} properties estimated " f"({n_estimated / max(n_with_price, 1) * 100:.1f}%)" ) # --- kNN blending --- print("\nBuilding kNN estimates...") trees = build_knn_pool(df.lazy(), index, CURRENT_FRAC_YEAR) lat = df["lat"].cast(pl.Float64).to_numpy() lon = df["lon"].cast(pl.Float64).to_numpy() tg = df["_type_group"].fill_null("").to_numpy() fa = df["Total floor area (sqm)"].cast(pl.Float64).fill_null(0.0).to_numpy() knn_psm = knn_median_psm(trees, lat, lon, tg) knn_est = knn_psm * fa # No temporal adj: ref == current df = df.with_columns( pl.Series("_knn_est", knn_est, dtype=pl.Float64), ) # Blend: where kNN available, use weighted average; else keep index df = df.with_columns( pl.when( pl.col("Estimated current price").is_not_null() & pl.col("_knn_est").is_not_null() & pl.col("_knn_est").is_finite() & (pl.col("_knn_est") > 0) ) .then( (1 - KNN_BLEND_WEIGHT) * pl.col("Estimated current price") + KNN_BLEND_WEIGHT * pl.col("_knn_est") ) .when(pl.col("Estimated current price").is_not_null()) .then(pl.col("Estimated current price")) .otherwise(pl.lit(None)) .alias("Estimated current price"), ) n_blended = df.filter( pl.col("_knn_est").is_not_null() & pl.col("_knn_est").is_finite() & (pl.col("_knn_est") > 0) & pl.col("Estimated current price").is_not_null() ).height print(f" kNN blended: {n_blended:,} of {n_estimated:,} estimates") # Derive estimated price per sqm where both estimated price and floor area exist df = df.with_columns( (pl.col("Estimated current price") / pl.col("Total floor area (sqm)")) .round(0) .cast(pl.Int32, strict=False) .alias("Est. price per sqm"), ) # Drop all temporary columns and joined lat/lon (those belong in postcode.parquet) temp_cols = [c for c in df.columns if c.startswith("_") or c.startswith("log_idx_")] df = df.drop(temp_cols).drop("lat", "lon") df.write_parquet(args.properties) size_mb = args.properties.stat().st_size / (1024 * 1024) print(f"\nWrote {args.properties} ({size_mb:.1f} MB)") print( f" {len(df):,} rows, {len(df.columns)} columns (including 'Estimated current price')" ) if __name__ == "__main__": main()