210 lines
7.2 KiB
Python
210 lines
7.2 KiB
Python
"""kNN price estimation using nearby recently-sold properties.
|
|
|
|
For each target property, finds k nearest sold properties of the same type,
|
|
computes the median index-adjusted price-per-sqm, and multiplies by the
|
|
target's floor area to produce an estimate.
|
|
"""
|
|
|
|
from pathlib import Path
|
|
|
|
import numpy as np
|
|
import polars as pl
|
|
from scipy.spatial import KDTree
|
|
|
|
from pipeline.transform.price_estimation.utils import (
|
|
TYPE_GROUPS,
|
|
interpolate_log_index,
|
|
sector_expr,
|
|
type_group_expr,
|
|
)
|
|
|
|
KNN_K = 20
|
|
KNN_MIN_NEIGHBORS = 5
|
|
KNN_BLEND_WEIGHT = 0.35
|
|
MIN_COMPARABLE_FLOOR_AREA_SQM = 15.0
|
|
MAX_COMPARABLE_FLOOR_AREA_SQM = 1_000.0
|
|
MIN_COMPARABLE_PSM = 500.0
|
|
MAX_COMPARABLE_PSM = 50_000.0
|
|
|
|
|
|
def _scale_coords(lat: np.ndarray, lon: np.ndarray) -> np.ndarray:
|
|
"""Equirectangular projection: scale lon by cos(lat) for approximate distances."""
|
|
return np.column_stack([lat, lon * np.cos(np.radians(lat))])
|
|
|
|
|
|
def build_knn_pool(
|
|
source: Path | pl.LazyFrame,
|
|
index: pl.DataFrame,
|
|
ref_frac_year: float,
|
|
max_sale_year: int | None = None,
|
|
) -> dict[str, tuple[KDTree, np.ndarray, np.ndarray, np.ndarray, np.ndarray]]:
|
|
"""Build per-type_group KD-trees of index-adjusted price-per-sqm.
|
|
|
|
Adjusts all pool properties' sale prices to ref_frac_year using the index,
|
|
then builds a KD-tree per type_group for nearest-neighbor queries.
|
|
|
|
Returns dict mapping type_group to KDTree, adjusted PSM, and sale identity
|
|
arrays used to keep the target sale out of its own comparable set.
|
|
"""
|
|
print("Building kNN pool...")
|
|
lf = pl.scan_parquet(source) if isinstance(source, Path) else source
|
|
query = lf.select(
|
|
"Postcode",
|
|
"Property type",
|
|
"lat",
|
|
"lon",
|
|
"Total floor area (sqm)",
|
|
"Last known price",
|
|
"Date of last transaction",
|
|
).filter(
|
|
pl.col("lat").is_not_null(),
|
|
pl.col("lon").is_not_null(),
|
|
pl.col("Total floor area (sqm)").is_not_null(),
|
|
pl.col("Total floor area (sqm)") >= MIN_COMPARABLE_FLOOR_AREA_SQM,
|
|
pl.col("Total floor area (sqm)") <= MAX_COMPARABLE_FLOOR_AREA_SQM,
|
|
pl.col("Last known price").is_not_null(),
|
|
pl.col("Last known price") > 0,
|
|
pl.col("Postcode").is_not_null(),
|
|
pl.col("Date of last transaction").is_not_null(),
|
|
)
|
|
if max_sale_year is not None:
|
|
query = query.filter(
|
|
pl.col("Date of last transaction").dt.year() < max_sale_year
|
|
)
|
|
|
|
pool = query.with_columns(
|
|
sector_expr(),
|
|
type_group_expr(),
|
|
(
|
|
pl.col("Date of last transaction").dt.year().cast(pl.Float64)
|
|
+ (pl.col("Date of last transaction").dt.month().cast(pl.Float64) - 1.0)
|
|
/ 12.0
|
|
).alias("_sale_fy"),
|
|
pl.lit(ref_frac_year).alias("_ref_fy"),
|
|
).collect()
|
|
pool = pool.filter(pl.col("type_group").is_not_null())
|
|
print(f" {len(pool):,} pool properties with lat/lon, floor area, price")
|
|
|
|
# Interpolate log_index at sale date and reference date
|
|
pool = interpolate_log_index(
|
|
index, pool, "sector", "type_group", "_sale_fy", "_li_sale"
|
|
)
|
|
pool = interpolate_log_index(
|
|
index, pool, "sector", "type_group", "_ref_fy", "_li_ref"
|
|
)
|
|
|
|
# adjusted_psm = price / floor_area * exp(log_index_ref - log_index_sale)
|
|
pool = pool.with_columns(
|
|
(
|
|
pl.col("Last known price").cast(pl.Float64)
|
|
/ pl.col("Total floor area (sqm)").cast(pl.Float64)
|
|
* (pl.col("_li_ref") - pl.col("_li_sale")).exp()
|
|
).alias("_adj_psm")
|
|
).filter(
|
|
pl.col("_adj_psm").is_not_null(),
|
|
pl.col("_adj_psm").is_finite(),
|
|
pl.col("_adj_psm") >= MIN_COMPARABLE_PSM,
|
|
pl.col("_adj_psm") <= MAX_COMPARABLE_PSM,
|
|
)
|
|
print(f" {len(pool):,} after index adjustment")
|
|
|
|
# Build per-type KD-trees
|
|
trees: dict[str, tuple[KDTree, np.ndarray, np.ndarray, np.ndarray, np.ndarray]] = {}
|
|
for tg in TYPE_GROUPS:
|
|
sub = pool.filter(pl.col("type_group") == tg)
|
|
n = len(sub)
|
|
if n < KNN_MIN_NEIGHBORS:
|
|
continue
|
|
lat = sub["lat"].to_numpy().astype(np.float64)
|
|
lon = sub["lon"].to_numpy().astype(np.float64)
|
|
psm = sub["_adj_psm"].to_numpy().astype(np.float64)
|
|
postcodes = sub["Postcode"].fill_null("").to_numpy()
|
|
prices = sub["Last known price"].to_numpy().astype(np.float64)
|
|
sale_dates = (
|
|
sub["Date of last transaction"]
|
|
.dt.epoch("d")
|
|
.fill_null(-1)
|
|
.to_numpy()
|
|
.astype(np.int64)
|
|
)
|
|
tree = KDTree(_scale_coords(lat, lon))
|
|
trees[tg] = (tree, psm, postcodes, prices, sale_dates)
|
|
print(f" {tg}: {n:,}")
|
|
|
|
return trees
|
|
|
|
|
|
def _sale_identity_matches(
|
|
pool_postcodes: np.ndarray,
|
|
pool_prices: np.ndarray,
|
|
pool_sale_dates: np.ndarray,
|
|
target_postcode: str,
|
|
target_price: float,
|
|
target_sale_date: int,
|
|
) -> np.ndarray:
|
|
if not target_postcode or not np.isfinite(target_price) or target_sale_date < 0:
|
|
return np.zeros(len(pool_postcodes), dtype=bool)
|
|
return (
|
|
(pool_postcodes == target_postcode)
|
|
& np.isfinite(pool_prices)
|
|
& np.isclose(pool_prices, target_price, rtol=0.0, atol=0.5)
|
|
& (pool_sale_dates == target_sale_date)
|
|
)
|
|
|
|
|
|
def knn_median_psm(
|
|
trees: dict[str, tuple[KDTree, np.ndarray, np.ndarray, np.ndarray, np.ndarray]],
|
|
lat: np.ndarray,
|
|
lon: np.ndarray,
|
|
type_groups: np.ndarray,
|
|
k: int = KNN_K,
|
|
postcodes: np.ndarray | None = None,
|
|
last_prices: np.ndarray | None = None,
|
|
last_sale_dates: np.ndarray | None = None,
|
|
) -> np.ndarray:
|
|
"""Return median adjusted-PSM of k nearest neighbours for each target.
|
|
|
|
PSM is at the reference date used when building the pool.
|
|
NaN where not computable (missing coords, unknown type, too few neighbors).
|
|
"""
|
|
n = len(lat)
|
|
result = np.full(n, np.nan)
|
|
|
|
for tg, (tree, psm, pool_postcodes, pool_prices, pool_sale_dates) in trees.items():
|
|
mask = (type_groups == tg) & np.isfinite(lat) & np.isfinite(lon)
|
|
idx = np.where(mask)[0]
|
|
if len(idx) == 0:
|
|
continue
|
|
|
|
query_k = min(max(k * 2, k + KNN_MIN_NEIGHBORS), len(psm))
|
|
if query_k < KNN_MIN_NEIGHBORS:
|
|
continue
|
|
|
|
coords = _scale_coords(lat[idx], lon[idx])
|
|
_, nn_idx = tree.query(coords, k=query_k)
|
|
if nn_idx.ndim == 1:
|
|
nn_idx = nn_idx.reshape(-1, 1)
|
|
|
|
medians = np.full(len(idx), np.nan)
|
|
for row_num, target_idx in enumerate(idx):
|
|
candidates = nn_idx[row_num]
|
|
if (
|
|
postcodes is not None
|
|
and last_prices is not None
|
|
and last_sale_dates is not None
|
|
):
|
|
same_sale = _sale_identity_matches(
|
|
pool_postcodes[candidates],
|
|
pool_prices[candidates],
|
|
pool_sale_dates[candidates],
|
|
str(postcodes[target_idx] or ""),
|
|
float(last_prices[target_idx]),
|
|
int(last_sale_dates[target_idx]),
|
|
)
|
|
candidates = candidates[~same_sale]
|
|
if len(candidates) >= KNN_MIN_NEIGHBORS:
|
|
medians[row_num] = np.nanmedian(psm[candidates[:k]])
|
|
|
|
result[idx] = medians
|
|
|
|
return result
|