"""Backtest price estimation on held-out recent sales. Uses temporal holdout: index built from pairs before TEST_YEAR_MIN only. Test set: properties with 2+ sales where the last sale >= TEST_YEAR_MIN. Evaluates: Naive vs Index vs kNN vs Blended. """ import argparse from pathlib import Path import numpy as np import polars as pl from pipeline.transform.price_estimation.index import build_index from pipeline.transform.price_estimation.knn import ( KNN_BLEND_WEIGHT, build_knn_pool, knn_median_psm, ) from pipeline.transform.price_estimation.utils import ( CURRENT_YEAR, MAX_LOG_ADJUSTMENT, interpolate_log_index, sector_expr, type_group_expr, ) TEST_YEAR_MIN = 2022 def extract_test_set(input_path: Path) -> pl.DataFrame: """Extract test pairs: second-to-last sale as input, last sale as ground truth.""" print("Loading test set...") df = ( pl.scan_parquet(input_path) .filter( pl.col("Postcode").is_not_null(), pl.col("historical_prices").list.len() >= 2, ) .with_columns( sector_expr(), type_group_expr(), # Last sale (ground truth) pl.col("historical_prices") .list.last() .struct.field("year") .alias("actual_year"), pl.col("historical_prices") .list.last() .struct.field("month") .alias("actual_month"), pl.col("historical_prices") .list.last() .struct.field("price") .alias("actual_price"), # Second-to-last sale (input) pl.col("historical_prices") .list.get(-2) .struct.field("year") .alias("input_year"), pl.col("historical_prices") .list.get(-2) .struct.field("month") .alias("input_month"), pl.col("historical_prices") .list.get(-2) .struct.field("price") .alias("input_price"), ) .with_columns( ( pl.col("actual_year").cast(pl.Float64) + (pl.col("actual_month").cast(pl.Float64) - 1.0) / 12.0 ).alias("actual_frac_year"), ( pl.col("input_year").cast(pl.Float64) + (pl.col("input_month").cast(pl.Float64) - 1.0) / 12.0 ).alias("input_frac_year"), ) .filter( pl.col("actual_year") >= TEST_YEAR_MIN, pl.col("input_price") > 0, pl.col("actual_price") > 0, pl.col("actual_frac_year") > pl.col("input_frac_year"), ) .collect() ) print(f" {len(df):,} test pairs (last sale {TEST_YEAR_MIN}-{CURRENT_YEAR})") return df def predict(test: pl.DataFrame, index: pl.DataFrame) -> pl.DataFrame: """Index-based prediction with interpolation and capping.""" test = interpolate_log_index( index, test, "sector", "type_group", "input_frac_year", "log_index_input" ) test = interpolate_log_index( index, test, "sector", "type_group", "actual_frac_year", "log_index_actual" ) test = test.with_columns( ( pl.col("input_price").cast(pl.Float64) * (pl.col("log_index_actual") - pl.col("log_index_input")) .clip(-MAX_LOG_ADJUSTMENT, MAX_LOG_ADJUSTMENT) .exp() ) .fill_null(pl.col("input_price").cast(pl.Float64)) .alias("predicted"), ) return test def compute_metrics(actual: np.ndarray, predicted: np.ndarray) -> dict: valid = ( np.isfinite(predicted) & np.isfinite(actual) & (actual > 0) & (predicted > 0) ) actual = actual[valid] predicted = predicted[valid] ape = np.abs(predicted - actual) / actual signed_err = predicted - actual return { "MdAPE (%)": float(np.median(ape) * 100), "% within 10%": float(np.mean(ape <= 0.10) * 100), "% within 20%": float(np.mean(ape <= 0.20) * 100), "% within 30%": float(np.mean(ape <= 0.30) * 100), "MAE (£)": float(np.mean(np.abs(signed_err))), "Mean signed error (£)": float(np.mean(signed_err)), "n": int(len(actual)), } def print_metrics_table(metrics_by_stage: dict): stages = list(metrics_by_stage.keys()) col_w = 15 width = 25 + col_w * len(stages) print("\n" + "=" * width) print(f"BACKTEST RESULTS (holdout: sales >= {TEST_YEAR_MIN})") print("=" * width) metric_names = [ "MdAPE (%)", "% within 10%", "% within 20%", "% within 30%", "MAE (£)", "Mean signed error (£)", "n", ] header = f"{'Metric':<25s}" for stage in stages: header += f" {stage:>{col_w - 1}s}" print(header) print("-" * width) for metric in metric_names: row = f"{metric:<25s}" for stage in stages: val = metrics_by_stage[stage][metric] if metric == "n": row += f" {val:>{col_w - 1},d}" elif "£" in metric: row += f" {val:>{col_w - 2},.0f}" else: row += f" {val:>{col_w - 2}.1f}%" print(row) print("=" * width) def main(): parser = argparse.ArgumentParser(description="Backtest price estimation model") parser.add_argument( "--input", type=Path, required=True, help="Path to properties.parquet" ) parser.add_argument( "--postcodes", type=Path, required=True, help="Path to postcode.parquet (for lat/lon)", ) parser.add_argument( "--output", type=Path, required=True, help="Output backtest_results.parquet" ) args = parser.parse_args() # Build index from pre-test data only (temporal holdout) print(f"Building price index (pairs with year2 < {TEST_YEAR_MIN})...") index = build_index( args.input, max_pair_year=TEST_YEAR_MIN, postcodes_path=args.postcodes ) print( f"\nHoldout index: {len(index):,} rows, {index['sector'].n_unique():,} sectors, " f"{index['type_group'].n_unique()} type groups" ) test = extract_test_set(args.input) # Join lat/lon from postcode.parquet (properties.parquet no longer has them) postcodes = pl.read_parquet(args.postcodes).select("Postcode", "lat", "lon") test = test.join(postcodes, on="Postcode", how="left") print("\nPredicting with price index...") test = predict(test, index) # --- kNN --- ref_fy = float(TEST_YEAR_MIN) # Pass joined LazyFrame (with lat/lon) instead of raw properties path pool_lf = pl.scan_parquet(args.input).join( postcodes.lazy(), on="Postcode", how="left" ) trees = build_knn_pool(pool_lf, index, ref_fy, max_sale_year=TEST_YEAR_MIN) # Interpolate log_index at reference year for temporal adjustment test = test.with_columns(pl.lit(ref_fy).alias("_ref_fy")) test = interpolate_log_index( index, test, "sector", "type_group", "_ref_fy", "_log_index_ref" ) lat = test["lat"].cast(pl.Float64).to_numpy() lon = test["lon"].cast(pl.Float64).to_numpy() tg = test["type_group"].to_numpy() fa = test["Total floor area (sqm)"].cast(pl.Float64).fill_null(0.0).to_numpy() print("\nComputing kNN estimates...") knn_psm = knn_median_psm(trees, lat, lon, tg) # Temporal adjustment: pool PSM is at ref, adjust to actual log_idx_actual = test["log_index_actual"].to_numpy().astype(np.float64) log_idx_ref = test["_log_index_ref"].to_numpy().astype(np.float64) temporal_adj = np.where( np.isfinite(log_idx_actual) & np.isfinite(log_idx_ref), np.exp(log_idx_actual - log_idx_ref), 1.0, ) knn_est = knn_psm * fa * temporal_adj n_knn = int((np.isfinite(knn_est) & (knn_est > 0)).sum()) print( f" kNN estimates: {n_knn:,} of {len(test):,} ({n_knn / len(test) * 100:.1f}%)" ) # Blend: (1-w)*index + w*kNN where both available index_est = test["predicted"].to_numpy().astype(np.float64) knn_valid = np.isfinite(knn_est) & (knn_est > 0) blended = np.where( knn_valid & np.isfinite(index_est), (1 - KNN_BLEND_WEIGHT) * index_est + KNN_BLEND_WEIGHT * knn_est, np.where(np.isfinite(index_est), index_est, knn_est), ) actual = test["actual_price"].to_numpy().astype(np.float64) metrics = { "Naive": compute_metrics( actual, test["input_price"].to_numpy().astype(np.float64) ), "Index": compute_metrics(actual, index_est), "kNN": compute_metrics(actual, knn_est), "Blended": compute_metrics(actual, blended), } print_metrics_table(metrics) # Save results result = test.select( "Postcode", "sector", "input_year", "input_frac_year", "input_price", "actual_year", "actual_frac_year", "actual_price", "predicted", ).with_columns( pl.Series("knn_predicted", knn_est, dtype=pl.Float64), pl.Series("blended", blended, dtype=pl.Float64), ) result.write_parquet(args.output) size_mb = args.output.stat().st_size / (1024 * 1024) print(f"\nWrote {args.output} ({size_mb:.1f} MB)") print(f" {len(result):,} rows") if __name__ == "__main__": main()