"""Repeat-Sales Price Index (improved) Builds a hierarchical repeat-sales price index with: 1. Stratification by property type (Detached/Semi-Detached/Terraced/Flats) 2. Robust regression (IRLS with Huber weights) instead of hard outlier cutoff 3. National hedonic time-dummy model as ultimate shrinkage fallback 4. Spatial smoothing for sparse sectors via KD-tree nearest neighbors Output: price_index.parquet — sector × type_group × year → log_index """ import argparse from pathlib import Path import numpy as np import polars as pl from scipy.sparse import csc_matrix from scipy.sparse.linalg import lsqr from scipy.spatial import KDTree from tqdm import tqdm # --- Constants --- MIN_PAIRS = 5 SHRINKAGE_K = 50 OUTLIER_THRESHOLD = 3.0 # hard pre-filter; Huber handles the rest HUBER_K = 1.345 IRLS_ITERATIONS = 5 SPATIAL_NEIGHBORS = 5 SPATIAL_BLEND_K = 30 CURRENT_YEAR = 2025 TYPE_GROUPS = ["Detached", "Semi-Detached", "Terraced", "Flats"] TERRACE_TYPES = ["Mid-Terrace", "End-Terrace", "Enclosed Mid-Terrace", "Enclosed End-Terrace"] AGE_BREAKS = [1900, 1930, 1950, 1967, 1983, 2000, 2010] AGE_LABELS = ["pre-1900", "1900-1929", "1930-1949", "1950-1966", "1967-1982", "1983-1999", "2000-2009", "2010+"] def type_group_expr(): """Polars expression: Property type → type_group.""" return ( pl.when(pl.col("Property type").is_in(TERRACE_TYPES)).then(pl.lit("Terraced")) .when(pl.col("Property type") == "Flats/Maisonettes").then(pl.lit("Flats")) .when(pl.col("Property type").is_in(["Detached", "Semi-Detached"])).then(pl.col("Property type")) .otherwise(pl.lit(None)) .alias("type_group") ) def age_band_expr(): """Polars expression: Construction age (UInt16 year) → age band string.""" expr = pl.when(pl.col("Construction age").is_null()).then(pl.lit(None)) for i, brk in enumerate(AGE_BREAKS): expr = expr.when(pl.col("Construction age") < brk).then(pl.lit(AGE_LABELS[i])) return expr.otherwise(pl.lit(AGE_LABELS[-1])).alias("age_band") def sector_expr(): """Polars expression: Postcode → sector (drop last 2 chars, strip).""" return pl.col("Postcode").str.slice(0, pl.col("Postcode").str.len_chars() - 2).str.strip_chars().alias("sector") def hierarchy_keys(sector: str) -> tuple[str, str]: """Return (district, area) for a sector string.""" district = sector.rsplit(" ", 1)[0] if " " in sector else sector area = "" for ch in district: if ch.isalpha(): area += ch else: break return district, area # --- Pair extraction --- def extract_pairs(input_path: Path) -> pl.DataFrame: print("Extracting repeat-sale pairs...") df = ( pl.scan_parquet(input_path) .select("Postcode", "historical_prices", "Property type") .filter(pl.col("Postcode").is_not_null(), pl.col("historical_prices").list.len() >= 2) .with_columns(sector_expr(), type_group_expr()) .collect() ) print(f" {len(df):,} properties with 2+ transactions") pairs = ( df.lazy() .with_columns( pl.col("historical_prices").list.slice(0, pl.col("historical_prices").list.len() - 1).alias("from_txn"), pl.col("historical_prices").list.slice(1).alias("to_txn"), ) .explode("from_txn", "to_txn") .with_columns( pl.col("from_txn").struct.field("year").alias("year1"), pl.col("from_txn").struct.field("price").alias("price1"), pl.col("to_txn").struct.field("year").alias("year2"), pl.col("to_txn").struct.field("price").alias("price2"), ) .select("sector", "type_group", "year1", "price1", "year2", "price2") .filter(pl.col("price1") > 0, pl.col("price2") > 0, pl.col("year2") > pl.col("year1")) .with_columns( (pl.col("price2").cast(pl.Float64) / pl.col("price1").cast(pl.Float64)).log().alias("log_ratio"), (1.0 / (pl.col("year2") - pl.col("year1")).cast(pl.Float64).sqrt()).alias("weight"), ) .filter(pl.col("log_ratio").abs() <= OUTLIER_THRESHOLD) .collect() ) # Add hierarchy columns pairs = pairs.with_columns( pl.col("sector").str.replace(r"\s+\d+$", "").alias("district"), ).with_columns( pl.col("district").str.replace(r"\d.*$", "").alias("area"), ) print(f" {len(pairs):,} pairs extracted") return pairs # --- Sector centroids --- def extract_centroids(input_path: Path) -> dict[str, tuple[float, float]]: print("Computing sector centroids...") df = ( pl.scan_parquet(input_path) .select("Postcode", "lat", "lon") .filter(pl.col("Postcode").is_not_null(), pl.col("lat").is_not_null()) .with_columns(sector_expr()) .group_by("sector") .agg(pl.col("lat").mean(), pl.col("lon").mean()) .collect() ) centroids = {} for row in df.iter_rows(named=True): centroids[row["sector"]] = (row["lat"], row["lon"]) print(f" {len(centroids):,} sector centroids") return centroids # --- Robust IRLS solver --- def solve_robust_index( years1: np.ndarray, years2: np.ndarray, log_ratios: np.ndarray, base_weights: np.ndarray, ) -> dict[int, float]: """IRLS Huber M-estimation for the Case-Shiller repeat-sales model.""" n = len(years1) if n < MIN_PAIRS: return {} all_years = np.union1d(years1, years2) min_year = int(all_years.min()) col = 0 year_to_col = {} for y in all_years: iy = int(y) if iy != min_year: year_to_col[iy] = col col += 1 n_cols = len(year_to_col) if n_cols == 0: return {} # Vectorized column index mapping col2 = np.full(n, -1, dtype=np.int32) col1 = np.full(n, -1, dtype=np.int32) for year, c in year_to_col.items(): col2[years2 == year] = c col1[years1 == year] = c # Sparse matrix structure (fixed across iterations) mask2 = col2 >= 0 mask1 = col1 >= 0 rows_arr = np.concatenate([np.where(mask2)[0], np.where(mask1)[0]]) cols_arr = np.concatenate([col2[mask2], col1[mask1]]) signs_arr = np.concatenate([np.ones(mask2.sum()), -np.ones(mask1.sum())]) weights = base_weights.copy() for _ in range(IRLS_ITERATIONS): data = signs_arr * weights[rows_arr] A = csc_matrix((data, (rows_arr, cols_arr)), shape=(n, n_cols)) b = log_ratios * weights betas = lsqr(A, b, atol=1e-10, btol=1e-10)[0] # Residuals predicted = np.zeros(n) predicted[mask2] += betas[col2[mask2]] predicted[mask1] -= betas[col1[mask1]] residuals = log_ratios - predicted # Huber reweighting abs_r = np.abs(residuals) huber_w = np.where(abs_r <= HUBER_K, 1.0, HUBER_K / np.maximum(abs_r, 1e-10)) weights = base_weights * huber_w index = {min_year: 0.0} for year, c in year_to_col.items(): index[year] = float(betas[c]) return index def compute_indices_for_level(pairs: pl.DataFrame, group_col: str): """Solve robust indices for each group. Returns (indices, n_pairs) dicts.""" groups = pairs.group_by(group_col).agg( pl.col("year1"), pl.col("year2"), pl.col("log_ratio"), pl.col("weight"), ) indices = {} n_pairs = {} for row in tqdm(groups.iter_rows(named=True), total=len(groups), desc=f" {group_col}"): key = row[group_col] y1 = np.array(row["year1"], dtype=np.int32) y2 = np.array(row["year2"], dtype=np.int32) lr = np.array(row["log_ratio"], dtype=np.float64) w = np.array(row["weight"], dtype=np.float64) idx = solve_robust_index(y1, y2, lr, w) if idx: indices[key] = idx n_pairs[key] = len(y1) return indices, n_pairs # --- Hedonic model --- def compute_hedonic_index(input_path: Path, min_year: int, max_year: int) -> dict[int, float]: """Two-step hedonic index: regress log(price) on features, average residual by year.""" print("Computing hedonic index...") df = ( pl.scan_parquet(input_path) .select( "Last known price", "Date of last transaction", "Property type", "Total floor area (sqm)", "Current energy rating", "Number of bedrooms & living rooms", "Construction age", ) .filter( pl.col("Last known price").is_not_null(), pl.col("Total floor area (sqm)").is_not_null(), pl.col("Total floor area (sqm)") > 0, pl.col("Current energy rating").is_in(["A", "B", "C", "D", "E", "F", "G"]), pl.col("Number of bedrooms & living rooms").is_not_null(), pl.col("Construction age").is_not_null(), ) .with_columns( pl.col("Date of last transaction").dt.year().alias("sale_year"), type_group_expr(), age_band_expr(), ) .filter( pl.col("type_group").is_not_null(), pl.col("sale_year").is_not_null(), pl.col("sale_year") >= min_year, pl.col("sale_year") <= max_year, ) .collect() ) print(f" {len(df):,} complete cases for hedonic model") # Target log_price = np.log(df["Last known price"].to_numpy().astype(np.float64)) sale_years = df["sale_year"].to_numpy() # Build feature matrix parts = [] # log(floor_area) fa = df["Total floor area (sqm)"].to_numpy().astype(np.float32) parts.append(np.log(np.maximum(fa, 1.0)).reshape(-1, 1)) # Type dummies (ref: Detached) tg = df["type_group"].to_numpy() for t in ["Terraced", "Semi-Detached", "Flats"]: parts.append((tg == t).astype(np.float32).reshape(-1, 1)) # EPC dummies (ref: D) epc = df["Current energy rating"].to_numpy() for r in ["A", "B", "C", "E", "F", "G"]: parts.append((epc == r).astype(np.float32).reshape(-1, 1)) # Rooms parts.append(df["Number of bedrooms & living rooms"].to_numpy().astype(np.float32).reshape(-1, 1)) # Age band dummies (ref: pre-1900) ab = df["age_band"].to_numpy() for band in AGE_LABELS[1:]: parts.append((ab == band).astype(np.float32).reshape(-1, 1)) # Intercept parts.append(np.ones((len(df), 1), dtype=np.float32)) F = np.hstack(parts) print(f" Feature matrix: {F.shape[0]:,} × {F.shape[1]}") # Step 1: regress log(price) on features → quality score betas = np.linalg.lstsq(F.astype(np.float64), log_price, rcond=None)[0] quality_score = F.astype(np.float64) @ betas residuals = log_price - quality_score # Step 2: average residual by year = hedonic index hedonic = {} for y in range(min_year, max_year + 1): mask = sale_years == y if mask.sum() > 0: hedonic[y] = float(np.mean(residuals[mask])) # Normalize: min_year = 0 base = hedonic.get(min_year, 0.0) for y in hedonic: hedonic[y] -= base print(f" Hedonic index: {len(hedonic)} years, range {min(hedonic.values()):.3f} to {max(hedonic.values()):.3f}") return hedonic # --- Shrinkage --- def shrink_index(raw: dict, parent: dict, n_pairs: int, k: int = SHRINKAGE_K) -> dict: w = n_pairs / (n_pairs + k) result = {} for y in set(raw) | set(parent): r = raw.get(y, parent.get(y, 0.0)) p = parent.get(y, raw.get(y, 0.0)) result[y] = w * r + (1 - w) * p return result def apply_shrinkage( sector_idx, sector_n, district_idx, district_n, area_idx, area_n, national_idx, national_n, hedonic_idx, all_sectors, sector_to_dist, dist_to_area, ): """Top-down hierarchical shrinkage: national→hedonic, area→national, etc.""" # National → hedonic national_shrunk = shrink_index(national_idx, hedonic_idx, national_n) # Area → national area_shrunk = {} for area, idx in area_idx.items(): area_shrunk[area] = shrink_index(idx, national_shrunk, area_n[area]) # District → area district_shrunk = {} for dist, idx in district_idx.items(): a = dist_to_area.get(dist, "") parent = area_shrunk.get(a, national_shrunk) district_shrunk[dist] = shrink_index(idx, parent, district_n[dist]) # Sector → district sector_shrunk = {} for sec, idx in sector_idx.items(): d = sector_to_dist.get(sec, "") parent = district_shrunk.get(d, national_shrunk) sector_shrunk[sec] = shrink_index(idx, parent, sector_n[sec]) # Fill sectors without their own index for sec in all_sectors: if sec not in sector_shrunk: d = sector_to_dist.get(sec, "") a = dist_to_area.get(d, "") sector_shrunk[sec] = district_shrunk.get( d, area_shrunk.get(a, national_shrunk) ) return sector_shrunk # --- Spatial smoothing --- def spatial_smooth( sector_indices: dict, centroids: dict, n_pairs_map: dict, ) -> dict: """Blend sparse sector indices with K nearest neighbors.""" # Build coordinate arrays for sectors with centroids sectors_with_coords = [s for s in sector_indices if s in centroids] if len(sectors_with_coords) < SPATIAL_NEIGHBORS + 1: return sector_indices coords = np.array([centroids[s] for s in sectors_with_coords]) # Scale longitude by cos(mean_lat) for approximate Euclidean distance mean_lat = np.mean(coords[:, 0]) scale = np.cos(np.radians(mean_lat)) scaled_coords = np.column_stack([coords[:, 0], coords[:, 1] * scale]) tree = KDTree(scaled_coords) result = dict(sector_indices) for i, sec in enumerate(sectors_with_coords): n = n_pairs_map.get(sec, 0) self_w = n / (n + SPATIAL_BLEND_K) if self_w > 0.95: continue # enough data, skip smoothing dists, idxs = tree.query(scaled_coords[i], k=SPATIAL_NEIGHBORS + 1) # Skip self (index 0, distance ~0) neighbor_dists = dists[1:] neighbor_idxs = idxs[1:] inv_dists = [] neighbor_indices = [] for d, j in zip(neighbor_dists, neighbor_idxs): ns = sectors_with_coords[j] if d > 0 and ns in sector_indices: inv_dists.append(1.0 / d) neighbor_indices.append(sector_indices[ns]) if not neighbor_indices: continue total_inv = sum(inv_dists) nbr_w = 1.0 - self_w ws = [iw / total_inv * nbr_w for iw in inv_dists] blended = {} all_years = set(sector_indices[sec]) for ni in neighbor_indices: all_years |= set(ni) for y in all_years: val = self_w * sector_indices[sec].get(y, 0.0) for ni, w in zip(neighbor_indices, ws): val += w * ni.get(y, 0.0) blended[y] = val result[sec] = blended return result # --- Forward fill --- def forward_fill(index: dict, min_year: int, max_year: int) -> dict: filled = {} last = 0.0 for y in range(min_year, max_year + 1): if y in index: last = index[y] filled[y] = last return filled # --- Main --- def main(): parser = argparse.ArgumentParser(description="Build improved repeat-sales price index") parser.add_argument("--input", type=Path, required=True) parser.add_argument("--output", type=Path, required=True) args = parser.parse_args() pairs = extract_pairs(args.input) centroids = extract_centroids(args.input) min_year = int(pairs["year1"].min()) max_year = max(int(pairs["year2"].max()), CURRENT_YEAR) hedonic_idx = compute_hedonic_index(args.input, min_year, max_year) # Precompute hierarchy all_sectors = pairs["sector"].unique().to_list() sector_to_dist = {} dist_to_area = {} for s in all_sectors: d, a = hierarchy_keys(s) sector_to_dist[s] = d dist_to_area[d] = a # Process each type group + "All" all_type_groups = ["All"] + TYPE_GROUPS final = {} # {type_group: {sector: {year: log_index}}} final_n = {} # {type_group: {sector: n_pairs}} for tg in all_type_groups: print(f"\n--- {tg} ---") typed = pairs if tg == "All" else pairs.filter(pl.col("type_group") == tg) if len(typed) < MIN_PAIRS: print(f" Skipping (only {len(typed)} pairs)") final[tg] = {s: dict(hedonic_idx) for s in all_sectors} final_n[tg] = {s: 0 for s in all_sectors} continue print(f" {len(typed):,} pairs") # National np_arrs = typed.select("year1", "year2", "log_ratio", "weight") national_idx = solve_robust_index( np_arrs["year1"].to_numpy(), np_arrs["year2"].to_numpy(), np_arrs["log_ratio"].to_numpy(), np_arrs["weight"].to_numpy(), ) national_n = len(typed) print(f" National: {len(national_idx)} years") # Area, district, sector print(" Computing per-level indices:") area_idx, area_n = compute_indices_for_level(typed, "area") district_idx, district_n = compute_indices_for_level(typed, "district") sector_idx, sector_n = compute_indices_for_level(typed, "sector") print(f" {len(area_idx)} areas, {len(district_idx)} districts, {len(sector_idx)} sectors") # Shrinkage print(" Applying shrinkage...") sector_shrunk = apply_shrinkage( sector_idx, sector_n, district_idx, district_n, area_idx, area_n, national_idx, national_n, hedonic_idx, all_sectors, sector_to_dist, dist_to_area, ) # Spatial smoothing print(" Spatial smoothing...") sector_smoothed = spatial_smooth(sector_shrunk, centroids, sector_n) # Forward fill for sec in all_sectors: sector_smoothed[sec] = forward_fill( sector_smoothed.get(sec, hedonic_idx), min_year, max_year ) final[tg] = sector_smoothed final_n[tg] = sector_n # Assemble output print("\nAssembling output...") rows = [] for tg in all_type_groups: for sec in all_sectors: n = final_n[tg].get(sec, 0) for year, log_idx in final[tg][sec].items(): rows.append((sec, tg, year, log_idx, n)) result = pl.DataFrame( rows, schema={"sector": pl.String, "type_group": pl.String, "year": pl.Int32, "log_index": pl.Float64, "n_pairs": pl.Int64}, orient="row", ).sort("type_group", "sector", "year") result.write_parquet(args.output) size_mb = args.output.stat().st_size / (1024 * 1024) print(f"\nWrote {args.output} ({size_mb:.1f} MB)") print(f" {result['sector'].n_unique():,} sectors × {len(all_type_groups)} types × {max_year - min_year + 1} years = {len(result):,} rows") if __name__ == "__main__": main()