474 lines
15 KiB
Python
474 lines
15 KiB
Python
"""Hierarchical repeat-sales price index.
|
|
|
|
Stratified by property type and postcode sector, with IRLS Huber regression,
|
|
hierarchical shrinkage (sector → district → area → national → hedonic),
|
|
and KD-tree spatial smoothing for sparse sectors.
|
|
|
|
Output: price_index.parquet — sector x type_group x year -> log_index
|
|
"""
|
|
|
|
import argparse
|
|
from pathlib import Path
|
|
|
|
import numpy as np
|
|
import polars as pl
|
|
from scipy.sparse import csc_matrix
|
|
from scipy.sparse.linalg import lsqr
|
|
from tqdm import tqdm
|
|
|
|
from pipeline.transform.price_estimation.shrinkage import (
|
|
blend_dicts,
|
|
hierarchical_shrinkage,
|
|
shrink_dicts,
|
|
spatial_smooth,
|
|
)
|
|
from pipeline.transform.price_estimation.utils import (
|
|
CURRENT_YEAR,
|
|
TYPE_GROUPS,
|
|
build_hedonic_features,
|
|
extract_centroids,
|
|
hierarchy_keys,
|
|
sector_expr,
|
|
type_group_expr,
|
|
)
|
|
|
|
MIN_PAIRS = 5
|
|
OUTLIER_THRESHOLD = 3.0 # hard pre-filter; Huber handles the rest
|
|
HUBER_K = 1.345
|
|
IRLS_ITERATIONS = 5
|
|
|
|
|
|
def extract_pairs(input_path: Path, max_year2: int | None = None) -> pl.DataFrame:
|
|
"""Extract consecutive repeat-sale pairs.
|
|
|
|
If max_year2 is set, only pairs where year2 < max_year2 are included
|
|
(for temporal holdout in backtesting).
|
|
"""
|
|
print("Extracting repeat-sale pairs...")
|
|
df = (
|
|
pl.scan_parquet(input_path)
|
|
.select("Postcode", "historical_prices", "Property type")
|
|
.filter(
|
|
pl.col("Postcode").is_not_null(),
|
|
pl.col("historical_prices").list.len() >= 2,
|
|
)
|
|
.with_columns(sector_expr(), type_group_expr())
|
|
.collect()
|
|
)
|
|
print(f" {len(df):,} properties with 2+ transactions")
|
|
|
|
pairs = (
|
|
df.lazy()
|
|
.with_columns(
|
|
pl.col("historical_prices")
|
|
.list.slice(0, pl.col("historical_prices").list.len() - 1)
|
|
.alias("from_txn"),
|
|
pl.col("historical_prices").list.slice(1).alias("to_txn"),
|
|
)
|
|
.explode("from_txn", "to_txn")
|
|
.with_columns(
|
|
pl.col("from_txn").struct.field("year").alias("year1"),
|
|
pl.col("from_txn").struct.field("month").alias("month1"),
|
|
pl.col("from_txn").struct.field("price").alias("price1"),
|
|
pl.col("to_txn").struct.field("year").alias("year2"),
|
|
pl.col("to_txn").struct.field("month").alias("month2"),
|
|
pl.col("to_txn").struct.field("price").alias("price2"),
|
|
)
|
|
.with_columns(
|
|
(
|
|
pl.col("year1").cast(pl.Float64)
|
|
+ (pl.col("month1").cast(pl.Float64) - 1.0) / 12.0
|
|
).alias("frac_year1"),
|
|
(
|
|
pl.col("year2").cast(pl.Float64)
|
|
+ (pl.col("month2").cast(pl.Float64) - 1.0) / 12.0
|
|
).alias("frac_year2"),
|
|
)
|
|
.select(
|
|
"sector",
|
|
"type_group",
|
|
"year1",
|
|
"price1",
|
|
"year2",
|
|
"price2",
|
|
"frac_year1",
|
|
"frac_year2",
|
|
)
|
|
.filter(
|
|
pl.col("price1") > 0,
|
|
pl.col("price2") > 0,
|
|
pl.col("frac_year2") > pl.col("frac_year1"),
|
|
)
|
|
.with_columns(
|
|
(pl.col("price2").cast(pl.Float64) / pl.col("price1").cast(pl.Float64))
|
|
.log()
|
|
.alias("log_ratio"),
|
|
(
|
|
1.0
|
|
/ (pl.col("frac_year2") - pl.col("frac_year1")).cast(pl.Float64).sqrt()
|
|
).alias("weight"),
|
|
)
|
|
.filter(pl.col("log_ratio").abs() <= OUTLIER_THRESHOLD)
|
|
.collect()
|
|
)
|
|
|
|
if max_year2 is not None:
|
|
pairs = pairs.filter(pl.col("year2") < max_year2)
|
|
|
|
# Add hierarchy columns
|
|
pairs = pairs.with_columns(
|
|
pl.col("sector").str.replace(r"\s+\d+$", "").alias("district"),
|
|
).with_columns(
|
|
pl.col("district").str.replace(r"\d.*$", "").alias("area"),
|
|
)
|
|
|
|
print(f" {len(pairs):,} pairs extracted")
|
|
return pairs
|
|
|
|
|
|
def solve_robust_index(
|
|
years1: np.ndarray,
|
|
years2: np.ndarray,
|
|
log_ratios: np.ndarray,
|
|
base_weights: np.ndarray,
|
|
) -> dict[int, float]:
|
|
"""IRLS Huber M-estimation for the Case-Shiller repeat-sales model."""
|
|
n = len(years1)
|
|
if n < MIN_PAIRS:
|
|
return {}
|
|
|
|
all_years = np.union1d(years1, years2)
|
|
min_year = int(all_years.min())
|
|
|
|
col = 0
|
|
year_to_col = {}
|
|
for y in all_years:
|
|
iy = int(y)
|
|
if iy != min_year:
|
|
year_to_col[iy] = col
|
|
col += 1
|
|
n_cols = len(year_to_col)
|
|
if n_cols == 0:
|
|
return {}
|
|
|
|
# Vectorized column index mapping
|
|
col2 = np.full(n, -1, dtype=np.int32)
|
|
col1 = np.full(n, -1, dtype=np.int32)
|
|
for year, c in year_to_col.items():
|
|
col2[years2 == year] = c
|
|
col1[years1 == year] = c
|
|
|
|
# Sparse matrix structure (fixed across iterations)
|
|
mask2 = col2 >= 0
|
|
mask1 = col1 >= 0
|
|
rows_arr = np.concatenate([np.where(mask2)[0], np.where(mask1)[0]])
|
|
cols_arr = np.concatenate([col2[mask2], col1[mask1]])
|
|
signs_arr = np.concatenate([np.ones(mask2.sum()), -np.ones(mask1.sum())])
|
|
|
|
weights = base_weights.copy()
|
|
|
|
for _ in range(IRLS_ITERATIONS):
|
|
data = signs_arr * weights[rows_arr]
|
|
A = csc_matrix((data, (rows_arr, cols_arr)), shape=(n, n_cols))
|
|
b = log_ratios * weights
|
|
betas = lsqr(A, b, atol=1e-10, btol=1e-10)[0]
|
|
|
|
# Residuals
|
|
predicted = np.zeros(n)
|
|
predicted[mask2] += betas[col2[mask2]]
|
|
predicted[mask1] -= betas[col1[mask1]]
|
|
residuals = log_ratios - predicted
|
|
|
|
# Huber reweighting
|
|
abs_r = np.abs(residuals)
|
|
huber_w = np.where(abs_r <= HUBER_K, 1.0, HUBER_K / np.maximum(abs_r, 1e-10))
|
|
weights = base_weights * huber_w
|
|
|
|
index = {min_year: 0.0}
|
|
for year, c in year_to_col.items():
|
|
index[year] = float(betas[c])
|
|
return index
|
|
|
|
|
|
def compute_indices_for_level(pairs: pl.DataFrame, group_col: str):
|
|
"""Solve robust indices for each group. Returns (indices, n_pairs) dicts."""
|
|
groups = pairs.group_by(group_col).agg(
|
|
pl.col("year1"),
|
|
pl.col("year2"),
|
|
pl.col("log_ratio"),
|
|
pl.col("weight"),
|
|
)
|
|
indices = {}
|
|
n_pairs = {}
|
|
for row in tqdm(
|
|
groups.iter_rows(named=True), total=len(groups), desc=f" {group_col}"
|
|
):
|
|
key = row[group_col]
|
|
y1 = np.array(row["year1"], dtype=np.int32)
|
|
y2 = np.array(row["year2"], dtype=np.int32)
|
|
lr = np.array(row["log_ratio"], dtype=np.float64)
|
|
w = np.array(row["weight"], dtype=np.float64)
|
|
idx = solve_robust_index(y1, y2, lr, w)
|
|
if idx:
|
|
indices[key] = idx
|
|
n_pairs[key] = len(y1)
|
|
return indices, n_pairs
|
|
|
|
|
|
def compute_hedonic_index(
|
|
input_path: Path,
|
|
min_year: int,
|
|
max_year: int,
|
|
max_sale_year: int | None = None,
|
|
) -> dict[int, float]:
|
|
"""Quality-adjusted hedonic index: regress log(price) on features, average residual by year.
|
|
|
|
Used as the ultimate shrinkage fallback for the repeat-sales index.
|
|
If max_sale_year is set, only sales before that year are used (backtesting holdout).
|
|
"""
|
|
effective_max = max_sale_year - 1 if max_sale_year is not None else max_year
|
|
print("Computing hedonic index...")
|
|
df = (
|
|
pl.scan_parquet(input_path)
|
|
.select(
|
|
"Last known price",
|
|
"Date of last transaction",
|
|
"Property type",
|
|
"Total floor area (sqm)",
|
|
)
|
|
.filter(
|
|
pl.col("Last known price").is_not_null(),
|
|
pl.col("Total floor area (sqm)").is_not_null(),
|
|
pl.col("Total floor area (sqm)") > 0,
|
|
)
|
|
.with_columns(
|
|
pl.col("Date of last transaction").dt.year().alias("sale_year"),
|
|
type_group_expr(),
|
|
)
|
|
.filter(
|
|
pl.col("type_group").is_not_null(),
|
|
pl.col("sale_year").is_not_null(),
|
|
pl.col("sale_year") >= min_year,
|
|
pl.col("sale_year") <= effective_max,
|
|
)
|
|
.collect()
|
|
)
|
|
print(f" {len(df):,} complete cases for hedonic model")
|
|
|
|
# Target
|
|
log_price = np.log(df["Last known price"].to_numpy().astype(np.float64))
|
|
sale_years = df["sale_year"].to_numpy()
|
|
|
|
# Build feature matrix (5 hedonic features + intercept)
|
|
X = build_hedonic_features(df)
|
|
F = np.hstack([X, np.ones((len(df), 1), dtype=np.float32)])
|
|
print(f" Feature matrix: {F.shape[0]:,} x {F.shape[1]}")
|
|
|
|
# Step 1: regress log(price) on features -> quality score
|
|
betas = np.linalg.lstsq(F.astype(np.float64), log_price, rcond=None)[0]
|
|
quality_score = F.astype(np.float64) @ betas
|
|
residuals = log_price - quality_score
|
|
|
|
# Step 2: average residual by year = hedonic index
|
|
hedonic = {}
|
|
for y in range(min_year, max_year + 1):
|
|
mask = sale_years == y
|
|
if mask.sum() > 0:
|
|
hedonic[y] = float(np.mean(residuals[mask]))
|
|
|
|
# Normalize: min_year = 0
|
|
base = hedonic.get(min_year, 0.0)
|
|
for y in hedonic:
|
|
hedonic[y] -= base
|
|
|
|
print(
|
|
f" Hedonic index: {len(hedonic)} years, range {min(hedonic.values()):.3f} to {max(hedonic.values()):.3f}"
|
|
)
|
|
return hedonic
|
|
|
|
|
|
EXTRAPOLATION_YEARS = 3
|
|
|
|
|
|
def forward_fill(index: dict, min_year: int, max_year: int) -> dict:
|
|
"""Forward-fill missing years, with linear extrapolation beyond last known year."""
|
|
if not index:
|
|
return {y: 0.0 for y in range(min_year, max_year + 1)}
|
|
|
|
sorted_years = sorted(index.keys())
|
|
last_known_year = sorted_years[-1]
|
|
|
|
# Forward fill up to last known year
|
|
filled = {}
|
|
last = 0.0
|
|
for y in range(min_year, last_known_year + 1):
|
|
if y in index:
|
|
last = index[y]
|
|
filled[y] = last
|
|
|
|
# Linear extrapolation beyond last known year
|
|
if last_known_year < max_year:
|
|
recent = [
|
|
(y, index[y])
|
|
for y in sorted_years
|
|
if y >= last_known_year - EXTRAPOLATION_YEARS
|
|
]
|
|
if len(recent) >= 2:
|
|
years_arr = np.array([r[0] for r in recent], dtype=np.float64)
|
|
vals_arr = np.array([r[1] for r in recent], dtype=np.float64)
|
|
slope = np.polyfit(years_arr, vals_arr, 1)[0]
|
|
for y in range(last_known_year + 1, max_year + 1):
|
|
filled[y] = index[last_known_year] + slope * (y - last_known_year)
|
|
else:
|
|
for y in range(last_known_year + 1, max_year + 1):
|
|
filled[y] = index[last_known_year]
|
|
|
|
return filled
|
|
|
|
|
|
def build_index(
|
|
input_path: Path,
|
|
max_pair_year: int | None = None,
|
|
postcodes_path: Path | None = None,
|
|
) -> pl.DataFrame:
|
|
"""Build the full price index from raw data.
|
|
|
|
If max_pair_year is set, only pairs before that year are used (backtesting holdout).
|
|
The index is still forward-filled to CURRENT_YEAR.
|
|
postcodes_path: if provided, lat/lon are read from this file instead of input_path.
|
|
"""
|
|
pairs = extract_pairs(input_path, max_year2=max_pair_year)
|
|
centroids = extract_centroids(postcodes_path or input_path)
|
|
|
|
min_year = int(pairs["year1"].min())
|
|
max_year = CURRENT_YEAR
|
|
|
|
hedonic_idx = compute_hedonic_index(
|
|
input_path, min_year, max_year, max_sale_year=max_pair_year
|
|
)
|
|
|
|
# Precompute hierarchy
|
|
all_sectors = pairs["sector"].unique().to_list()
|
|
sector_to_dist = {}
|
|
dist_to_area = {}
|
|
for s in all_sectors:
|
|
d, a = hierarchy_keys(s)
|
|
sector_to_dist[s] = d
|
|
dist_to_area[d] = a
|
|
|
|
# Process each type group + "All"
|
|
all_type_groups = ["All"] + TYPE_GROUPS
|
|
final = {} # {type_group: {sector: {year: log_index}}}
|
|
final_n = {} # {type_group: {sector: n_pairs}}
|
|
|
|
for tg in all_type_groups:
|
|
print(f"\n--- {tg} ---")
|
|
typed = pairs if tg == "All" else pairs.filter(pl.col("type_group") == tg)
|
|
if len(typed) < MIN_PAIRS:
|
|
print(f" Skipping (only {len(typed)} pairs)")
|
|
final[tg] = {s: dict(hedonic_idx) for s in all_sectors}
|
|
final_n[tg] = {s: 0 for s in all_sectors}
|
|
continue
|
|
|
|
print(f" {len(typed):,} pairs")
|
|
|
|
# National
|
|
np_arrs = typed.select("year1", "year2", "log_ratio", "weight")
|
|
national_idx = solve_robust_index(
|
|
np_arrs["year1"].to_numpy(),
|
|
np_arrs["year2"].to_numpy(),
|
|
np_arrs["log_ratio"].to_numpy(),
|
|
np_arrs["weight"].to_numpy(),
|
|
)
|
|
national_n = len(typed)
|
|
print(f" National: {len(national_idx)} years")
|
|
|
|
# Area, district, sector
|
|
print(" Computing per-level indices:")
|
|
area_idx, area_n = compute_indices_for_level(typed, "area")
|
|
district_idx, district_n = compute_indices_for_level(typed, "district")
|
|
sector_idx, sector_n = compute_indices_for_level(typed, "sector")
|
|
print(
|
|
f" {len(area_idx)} areas, {len(district_idx)} districts, {len(sector_idx)} sectors"
|
|
)
|
|
|
|
# Shrinkage: national -> hedonic first, then hierarchical
|
|
print(" Applying shrinkage...")
|
|
national_shrunk = shrink_dicts(national_idx, hedonic_idx, national_n)
|
|
sector_shrunk = hierarchical_shrinkage(
|
|
sector_idx,
|
|
sector_n,
|
|
district_idx,
|
|
district_n,
|
|
area_idx,
|
|
area_n,
|
|
national_shrunk,
|
|
all_sectors,
|
|
sector_to_dist,
|
|
dist_to_area,
|
|
shrink_dicts,
|
|
)
|
|
|
|
# Spatial smoothing
|
|
print(" Spatial smoothing...")
|
|
sector_smoothed = spatial_smooth(
|
|
sector_shrunk, centroids, sector_n, blend_dicts
|
|
)
|
|
|
|
# Forward fill
|
|
for sec in all_sectors:
|
|
sector_smoothed[sec] = forward_fill(
|
|
sector_smoothed.get(sec, hedonic_idx), min_year, max_year
|
|
)
|
|
|
|
final[tg] = sector_smoothed
|
|
final_n[tg] = sector_n
|
|
|
|
# Assemble output
|
|
print("\nAssembling output...")
|
|
rows = []
|
|
for tg in all_type_groups:
|
|
for sec in all_sectors:
|
|
n = final_n[tg].get(sec, 0)
|
|
for year, log_idx in final[tg][sec].items():
|
|
rows.append((sec, tg, year, log_idx, n))
|
|
|
|
return pl.DataFrame(
|
|
rows,
|
|
schema={
|
|
"sector": pl.String,
|
|
"type_group": pl.String,
|
|
"year": pl.Int32,
|
|
"log_index": pl.Float64,
|
|
"n_pairs": pl.Int64,
|
|
},
|
|
orient="row",
|
|
).sort("type_group", "sector", "year")
|
|
|
|
|
|
def main():
|
|
parser = argparse.ArgumentParser(
|
|
description="Build improved repeat-sales price index"
|
|
)
|
|
parser.add_argument("--input", type=Path, required=True)
|
|
parser.add_argument(
|
|
"--postcodes",
|
|
type=Path,
|
|
required=True,
|
|
help="Path to postcode.parquet (for lat/lon centroids)",
|
|
)
|
|
parser.add_argument("--output", type=Path, required=True)
|
|
args = parser.parse_args()
|
|
|
|
result = build_index(args.input, postcodes_path=args.postcodes)
|
|
|
|
result.write_parquet(args.output)
|
|
size_mb = args.output.stat().st_size / (1024 * 1024)
|
|
print(f"\nWrote {args.output} ({size_mb:.1f} MB)")
|
|
print(
|
|
f" {result['sector'].n_unique():,} sectors x {result['type_group'].n_unique()} types x {result['year'].n_unique()} years = {len(result):,} rows"
|
|
)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|