perfect-postcode/pipeline/transform/price_estimation/estimate.py

371 lines
14 KiB
Python

"""Augment properties.parquet with estimated current prices.
For properties with a known prior sale, applies the repeat-sales price index
to adjust the last known price to the current date, then blends with kNN
estimates from nearby recently-sold properties. Includes:
- Capping extreme index adjustments
- kNN spatial blending
Modifies properties.parquet in-place. Temporarily joins postcode.parquet
for lat/lon needed by kNN, then drops those columns before writing.
"""
import argparse
from pathlib import Path
import numpy as np
import polars as pl
from pipeline.transform.price_estimation.knn import (
KNN_BLEND_WEIGHT,
MAX_COMPARABLE_PSM,
MIN_COMPARABLE_PSM,
build_knn_pool,
knn_median_psm,
)
from pipeline.transform.price_estimation.utils import (
CURRENT_FRAC_YEAR,
CURRENT_YEAR,
MAX_LOG_ADJUSTMENT,
interpolate_log_index,
sector_expr,
type_group_expr,
)
MAX_KNN_TO_INDEX_RATIO = 2.0
MIN_KNN_TO_INDEX_RATIO = 0.5
# Cap the final estimate at this multiple of the last known price as a guard
# against data errors. Set to ~exp(MAX_LOG_ADJUSTMENT) (~20x) so it is
# consistent with the log-index clip already applied to the index move: many
# UK sectors legitimately grew >6x since the 1990s (e.g. parts of inner London
# 12-14x), so the previous 6x cap truncated genuine appreciation rather than
# only catching outliers.
MAX_ESTIMATE_TO_LAST_PRICE_RATIO = 20.0
# Guard for rows with NO usable floor area: the per-sqm plausibility check
# cannot fire there, which let commercial blocks misfiled as dwellings keep
# absurd headline estimates (e.g. a GBP 175M "Detached" in SW1W). Without
# floor area we cannot psm-check, so the only sanity reference left is what
# the local market actually pays: beyond this multiple of the district's
# recent 99th-percentile sale price the estimate is unreliable and misleading,
# so it is nulled rather than shown.
FLOORLESS_ESTIMATE_P99_MULT = 2.0
# Never null a floorless estimate below this absolute value: genuine mansions
# in cheap districts can legitimately exceed 2x their district's recent p99,
# but a sub-GBP 2M estimate is within the plausible single-dwelling range
# anywhere in the UK, so it survives regardless of the local p99.
FLOORLESS_ESTIMATE_MIN_CAP = 2_000_000.0
# Look-back window for the district p99 reference: long enough that thin
# districts accumulate a usable sale sample, short enough that the reference
# reflects today's price level rather than a pre-boom one.
FLOORLESS_P99_LOOKBACK_YEARS = 10
def apply_floorless_estimate_guard(df: pl.DataFrame) -> pl.DataFrame:
"""Null floor-area-less estimates far above their district's recent sales.
Builds a per-district reference from the SAME frame -- the 99th percentile
of `Last known price` over sales in the last FLOORLESS_P99_LOOKBACK_YEARS
-- and nulls `Estimated current price` where the floor area is null/zero
AND the estimate exceeds max(FLOORLESS_ESTIMATE_P99_MULT * p99,
FLOORLESS_ESTIMATE_MIN_CAP). Districts with no recent sales yield a null
p99 and are left alone: with neither a psm check nor a local reference we
cannot judge the estimate, and nulling on the absolute cap alone would be
too aggressive. Expects the `_sector` helper column; rows with floor area
present are never touched (the psm guard covers them).
"""
# District = sector minus the trailing sector digit group, matching the
# rsplit semantics of utils.hierarchy_keys ("SW1W 9" -> "SW1W").
district = pl.col("_sector").str.replace(r"\s+\d+$", "")
district_p99 = (
df.lazy()
.filter(
pl.col("Last known price").is_not_null(),
pl.col("Date of last transaction").dt.year()
>= CURRENT_YEAR - FLOORLESS_P99_LOOKBACK_YEARS,
)
.group_by(district.alias("_district"))
.agg(
pl.col("Last known price")
.cast(pl.Float64)
.quantile(0.99)
.alias("_district_p99")
)
.collect()
)
df = df.with_columns(district.alias("_district")).join(
district_p99, on="_district", how="left", maintain_order="left"
)
floorless = pl.col("Total floor area (sqm)").is_null() | (
pl.col("Total floor area (sqm)") <= 0
)
cap = pl.max_horizontal(
FLOORLESS_ESTIMATE_P99_MULT * pl.col("_district_p99"),
pl.lit(FLOORLESS_ESTIMATE_MIN_CAP),
)
implausible = (
pl.col("Estimated current price").is_not_null()
& floorless
& pl.col("_district_p99").is_not_null()
& (pl.col("Estimated current price") > cap)
)
n_nulled = df.select(implausible.sum()).item()
print(f" Floorless-estimate guard: nulled {n_nulled:,} estimates")
return df.with_columns(
pl.when(implausible)
.then(None)
.otherwise(pl.col("Estimated current price"))
.alias("Estimated current price"),
).drop("_district", "_district_p99")
def guarded_blend_estimates(
index_est: np.ndarray,
knn_est: np.ndarray,
last_prices: np.ndarray,
weight: float = KNN_BLEND_WEIGHT,
) -> np.ndarray:
"""Blend only stable kNN estimates and cap final uplift from last sale price."""
index_est = index_est.astype(np.float64, copy=False)
knn_est = knn_est.astype(np.float64, copy=False)
last_prices = last_prices.astype(np.float64, copy=False)
has_index = np.isfinite(index_est) & (index_est > 0)
has_knn = np.isfinite(knn_est) & (knn_est > 0)
stable_knn = has_knn & (
has_index
& (knn_est >= index_est * MIN_KNN_TO_INDEX_RATIO)
& (knn_est <= index_est * MAX_KNN_TO_INDEX_RATIO)
)
blended = np.where(
has_index & stable_knn,
(1 - weight) * index_est + weight * knn_est,
np.where(has_index, index_est, np.nan),
)
cap = np.where(
np.isfinite(last_prices) & (last_prices > 0),
last_prices * MAX_ESTIMATE_TO_LAST_PRICE_RATIO,
np.nan,
)
return np.where(
np.isfinite(cap) & np.isfinite(blended), np.minimum(blended, cap), blended
)
def main():
parser = argparse.ArgumentParser(
description="Augment properties.parquet with estimated current prices"
)
parser.add_argument(
"--properties",
type=Path,
required=True,
help="Path to properties.parquet (modified in-place)",
)
parser.add_argument(
"--postcodes",
type=Path,
required=True,
help="Path to postcode.parquet (for lat/lon needed by kNN)",
)
parser.add_argument(
"--index", type=Path, required=True, help="Path to price_index.parquet"
)
args = parser.parse_args()
print("Loading properties.parquet...")
df = pl.read_parquet(args.properties)
print(f" {len(df):,} rows, {len(df.columns)} columns")
# Join lat/lon from postcode.parquet for kNN spatial queries
postcodes = pl.read_parquet(args.postcodes).select("Postcode", "lat", "lon")
df = df.join(postcodes, on="Postcode", how="left")
print(f" Joined lat/lon from {len(postcodes):,} postcodes")
# Drop existing estimated columns if re-running
for col in ["Estimated current price", "Est. price per sqm"]:
if col in df.columns:
df = df.drop(col)
# Derive helper columns
df = df.with_columns(
sector_expr().alias("_sector"),
(
pl.col("Date of last transaction").dt.year().cast(pl.Float64)
+ (pl.col("Date of last transaction").dt.month().cast(pl.Float64) - 1.0)
/ 12.0
).alias("_sale_frac_year"),
type_group_expr().alias("_type_group"),
pl.lit(CURRENT_FRAC_YEAR).alias("_current_frac_year"),
)
index = pl.read_parquet(args.index)
print(
f" Price index: {len(index):,} rows, {index['sector'].n_unique():,} sectors, "
f"{index['type_group'].n_unique()} type groups"
)
print("\nApplying repeat-sales index with fractional year interpolation...")
df = interpolate_log_index(
index, df, "_sector", "_type_group", "_sale_frac_year", "_log_index_sale_interp"
)
df = interpolate_log_index(
index,
df,
"_sector",
"_type_group",
"_current_frac_year",
"_log_index_current_interp",
)
# Compute index-adjusted estimate with cap
has_price = (
pl.col("Last known price").is_not_null()
& pl.col("Postcode").is_not_null()
& pl.col("Date of last transaction").is_not_null()
)
df = df.with_columns(
pl.when(has_price)
.then(
pl.col("Last known price").cast(pl.Float64)
* (pl.col("_log_index_current_interp") - pl.col("_log_index_sale_interp"))
.clip(-MAX_LOG_ADJUSTMENT, MAX_LOG_ADJUSTMENT)
.exp()
)
.otherwise(pl.lit(None))
.alias("Estimated current price"),
)
n_estimated = df.filter(pl.col("Estimated current price").is_not_null()).height
n_with_price = df.filter(has_price).height
print(
f" {n_estimated:,} of {n_with_price:,} properties estimated "
f"({n_estimated / max(n_with_price, 1) * 100:.1f}%)"
)
# --- kNN blending ---
print("\nBuilding kNN estimates...")
trees = build_knn_pool(df.lazy(), index, CURRENT_FRAC_YEAR)
lat = df["lat"].cast(pl.Float64).to_numpy()
lon = df["lon"].cast(pl.Float64).to_numpy()
tg = df["_type_group"].fill_null("").to_numpy()
fa = df["Total floor area (sqm)"].cast(pl.Float64).fill_null(0.0).to_numpy()
last_prices = (
df["Last known price"].cast(pl.Float64).fill_null(float("nan")).to_numpy()
)
last_sale_dates = (
df["Date of last transaction"]
.dt.epoch("d")
.fill_null(-1)
.to_numpy()
.astype(np.int64)
)
knn_psm = knn_median_psm(
trees,
lat,
lon,
tg,
postcodes=df["Postcode"].fill_null("").to_numpy(),
last_prices=last_prices,
last_sale_dates=last_sale_dates,
)
knn_est = knn_psm * fa # No temporal adj: ref == current
df = df.with_columns(
pl.Series("_knn_est", knn_est, dtype=pl.Float64),
)
# Blend only when kNN is close to the index estimate; otherwise keep index.
index_est = (
df["Estimated current price"]
.cast(pl.Float64)
.fill_null(float("nan"))
.to_numpy()
)
blended = guarded_blend_estimates(index_est, knn_est, last_prices)
df = df.with_columns(
pl.Series("_index_est", index_est, dtype=pl.Float64),
pl.Series("Estimated current price", blended, dtype=pl.Float64),
).with_columns(
pl.col("Estimated current price").fill_nan(None),
)
n_blended = df.filter(
pl.col("_knn_est").is_not_null()
& pl.col("_knn_est").is_finite()
& (pl.col("_knn_est") > 0)
& (pl.col("_index_est").is_not_null())
& (pl.col("_knn_est") >= pl.col("_index_est") * MIN_KNN_TO_INDEX_RATIO)
& (pl.col("_knn_est") <= pl.col("_index_est") * MAX_KNN_TO_INDEX_RATIO)
& pl.col("Estimated current price").is_not_null()
).height
print(f" kNN blended: {n_blended:,} of {n_estimated:,} estimates")
# Null the absolute "Estimated current price" itself when its implied
# per-sqm is implausible (outside [MIN_COMPARABLE_PSM, MAX_COMPARABLE_PSM])
# AND the floor area is known: these come from bulk/block transfers or
# garbage source prices (e.g. a £207.5M "sale" on a 93 m² terrace -> a £197M
# estimate) and are not meaningful single-dwelling values. Previously only
# the derived per-sqm was nulled, leaving the absurd headline price visible.
_raw_est_psm = pl.col("Estimated current price") / pl.col("Total floor area (sqm)")
df = df.with_columns(
pl.when(
pl.col("Estimated current price").is_not_null()
& pl.col("Total floor area (sqm)").is_not_null()
& (pl.col("Total floor area (sqm)") > 0)
& ((_raw_est_psm < MIN_COMPARABLE_PSM) | (_raw_est_psm > MAX_COMPARABLE_PSM))
)
.then(None)
.otherwise(pl.col("Estimated current price"))
.alias("Estimated current price"),
)
# Floor-area-less rows escape the per-sqm guard above entirely; cap them
# against their district's recent sale prices instead (see
# apply_floorless_estimate_guard). Must run before temp columns
# (_sector) are dropped.
df = apply_floorless_estimate_guard(df)
# Derive estimated price per sqm where both estimated price and floor area
# exist. Now that the implausible-psm estimates are nulled above, the band
# filter here mainly guards the floor-area>0 case. (The floorless guard
# never touches floor-area-present rows, so this derivation is unaffected.)
_est_psm = pl.col("Estimated current price") / pl.col("Total floor area (sqm)")
df = df.with_columns(
pl.when(
pl.col("Estimated current price").is_not_null()
& pl.col("Total floor area (sqm)").is_not_null()
& (pl.col("Total floor area (sqm)") > 0)
& (_est_psm >= MIN_COMPARABLE_PSM)
& (_est_psm <= MAX_COMPARABLE_PSM)
)
.then(_est_psm.round(0).cast(pl.Int32, strict=False))
.otherwise(None)
.alias("Est. price per sqm"),
)
# Drop all temporary columns and joined lat/lon (those belong in postcode.parquet)
temp_cols = [c for c in df.columns if c.startswith("_") or c.startswith("log_idx_")]
df = df.drop(temp_cols).drop("lat", "lon")
df.write_parquet(args.properties)
size_mb = args.properties.stat().st_size / (1024 * 1024)
print(f"\nWrote {args.properties} ({size_mb:.1f} MB)")
print(
f" {len(df):,} rows, {len(df.columns)} columns (including 'Estimated current price')"
)
if __name__ == "__main__":
main()