"""kNN price estimation using nearby recently-sold properties. For each target property, finds k nearest sold properties of the same type, computes the median index-adjusted price-per-sqm, and multiplies by the target's floor area to produce an estimate. """ from pathlib import Path import numpy as np import polars as pl from scipy.spatial import KDTree from pipeline.transform.price_estimation.utils import ( TYPE_GROUPS, interpolate_log_index, sector_expr, type_group_expr, ) KNN_K = 20 KNN_MIN_NEIGHBORS = 5 KNN_BLEND_WEIGHT = 0.35 MIN_COMPARABLE_FLOOR_AREA_SQM = 15.0 MAX_COMPARABLE_FLOOR_AREA_SQM = 1_000.0 MIN_COMPARABLE_PSM = 500.0 MAX_COMPARABLE_PSM = 50_000.0 def _scale_coords(lat: np.ndarray, lon: np.ndarray) -> np.ndarray: """Equirectangular projection: scale lon by cos(lat) for approximate distances.""" return np.column_stack([lat, lon * np.cos(np.radians(lat))]) def build_knn_pool( source: Path | pl.LazyFrame, index: pl.DataFrame, ref_frac_year: float, max_sale_year: int | None = None, ) -> dict[str, tuple[KDTree, np.ndarray, np.ndarray, np.ndarray, np.ndarray]]: """Build per-type_group KD-trees of index-adjusted price-per-sqm. Adjusts all pool properties' sale prices to ref_frac_year using the index, then builds a KD-tree per type_group for nearest-neighbor queries. Returns dict mapping type_group to KDTree, adjusted PSM, and sale identity arrays used to keep the target sale out of its own comparable set. """ print("Building kNN pool...") lf = pl.scan_parquet(source) if isinstance(source, Path) else source query = lf.select( "Postcode", "Property type", "lat", "lon", "Total floor area (sqm)", "Last known price", "Date of last transaction", ).filter( pl.col("lat").is_not_null(), pl.col("lon").is_not_null(), pl.col("Total floor area (sqm)").is_not_null(), pl.col("Total floor area (sqm)") >= MIN_COMPARABLE_FLOOR_AREA_SQM, pl.col("Total floor area (sqm)") <= MAX_COMPARABLE_FLOOR_AREA_SQM, pl.col("Last known price").is_not_null(), pl.col("Last known price") > 0, pl.col("Postcode").is_not_null(), pl.col("Date of last transaction").is_not_null(), ) if max_sale_year is not None: query = query.filter( pl.col("Date of last transaction").dt.year() < max_sale_year ) pool = query.with_columns( sector_expr(), type_group_expr(), ( pl.col("Date of last transaction").dt.year().cast(pl.Float64) + (pl.col("Date of last transaction").dt.month().cast(pl.Float64) - 1.0) / 12.0 ).alias("_sale_fy"), pl.lit(ref_frac_year).alias("_ref_fy"), ).collect() pool = pool.filter(pl.col("type_group").is_not_null()) print(f" {len(pool):,} pool properties with lat/lon, floor area, price") # Interpolate log_index at sale date and reference date pool = interpolate_log_index( index, pool, "sector", "type_group", "_sale_fy", "_li_sale" ) pool = interpolate_log_index( index, pool, "sector", "type_group", "_ref_fy", "_li_ref" ) # adjusted_psm = price / floor_area * exp(log_index_ref - log_index_sale) pool = pool.with_columns( ( pl.col("Last known price").cast(pl.Float64) / pl.col("Total floor area (sqm)").cast(pl.Float64) * (pl.col("_li_ref") - pl.col("_li_sale")).exp() ).alias("_adj_psm") ).filter( pl.col("_adj_psm").is_not_null(), pl.col("_adj_psm").is_finite(), pl.col("_adj_psm") >= MIN_COMPARABLE_PSM, pl.col("_adj_psm") <= MAX_COMPARABLE_PSM, ) print(f" {len(pool):,} after index adjustment") # Build per-type KD-trees trees: dict[str, tuple[KDTree, np.ndarray, np.ndarray, np.ndarray, np.ndarray]] = {} for tg in TYPE_GROUPS: sub = pool.filter(pl.col("type_group") == tg) n = len(sub) if n < KNN_MIN_NEIGHBORS: continue lat = sub["lat"].to_numpy().astype(np.float64) lon = sub["lon"].to_numpy().astype(np.float64) psm = sub["_adj_psm"].to_numpy().astype(np.float64) postcodes = sub["Postcode"].fill_null("").to_numpy() prices = sub["Last known price"].to_numpy().astype(np.float64) sale_dates = ( sub["Date of last transaction"] .dt.epoch("d") .fill_null(-1) .to_numpy() .astype(np.int64) ) tree = KDTree(_scale_coords(lat, lon)) trees[tg] = (tree, psm, postcodes, prices, sale_dates) print(f" {tg}: {n:,}") return trees def _sale_identity_matches( pool_postcodes: np.ndarray, pool_prices: np.ndarray, pool_sale_dates: np.ndarray, target_postcode: str, target_price: float, target_sale_date: int, ) -> np.ndarray: if not target_postcode or not np.isfinite(target_price) or target_sale_date < 0: return np.zeros(len(pool_postcodes), dtype=bool) return ( (pool_postcodes == target_postcode) & np.isfinite(pool_prices) & np.isclose(pool_prices, target_price, rtol=0.0, atol=0.5) & (pool_sale_dates == target_sale_date) ) def knn_median_psm( trees: dict[str, tuple[KDTree, np.ndarray, np.ndarray, np.ndarray, np.ndarray]], lat: np.ndarray, lon: np.ndarray, type_groups: np.ndarray, k: int = KNN_K, postcodes: np.ndarray | None = None, last_prices: np.ndarray | None = None, last_sale_dates: np.ndarray | None = None, ) -> np.ndarray: """Return median adjusted-PSM of k nearest neighbours for each target. PSM is at the reference date used when building the pool. NaN where not computable (missing coords, unknown type, too few neighbors). """ n = len(lat) result = np.full(n, np.nan) for tg, (tree, psm, pool_postcodes, pool_prices, pool_sale_dates) in trees.items(): mask = (type_groups == tg) & np.isfinite(lat) & np.isfinite(lon) idx = np.where(mask)[0] if len(idx) == 0: continue query_k = min(max(k * 2, k + KNN_MIN_NEIGHBORS), len(psm)) if query_k < KNN_MIN_NEIGHBORS: continue coords = _scale_coords(lat[idx], lon[idx]) _, nn_idx = tree.query(coords, k=query_k) if nn_idx.ndim == 1: nn_idx = nn_idx.reshape(-1, 1) medians = np.full(len(idx), np.nan) for row_num, target_idx in enumerate(idx): candidates = nn_idx[row_num] if ( postcodes is not None and last_prices is not None and last_sale_dates is not None ): same_sale = _sale_identity_matches( pool_postcodes[candidates], pool_prices[candidates], pool_sale_dates[candidates], str(postcodes[target_idx] or ""), float(last_prices[target_idx]), int(last_sale_dates[target_idx]), ) candidates = candidates[~same_sale] if len(candidates) >= KNN_MIN_NEIGHBORS: medians[row_num] = np.nanmedian(psm[candidates[:k]]) result[idx] = medians return result