"""Shared utilities for price estimation modules.""" from datetime import date import numpy as np import polars as pl CURRENT_YEAR = 2026 _today = date.today() CURRENT_FRAC_YEAR = _today.year + (_today.month - 1) / 12 # Cap on log(index_ratio) to prevent wild estimates from thin sectors MAX_LOG_ADJUSTMENT = 3.0 # ~20x max price change TERRACE_TYPES = [ "Mid-Terrace", "End-Terrace", "Enclosed Mid-Terrace", "Enclosed End-Terrace", "Terraced", ] FLAT_TYPES = ["Flats/Maisonettes"] TYPE_GROUPS = ["Detached", "Semi-Detached", "Terraced", "Flats", "Bungalow"] SHRINKAGE_K = 50 def type_group_expr(): """Polars expression: Property type -> type_group.""" return ( pl.when(pl.col("Property type").is_in(TERRACE_TYPES)) .then(pl.lit("Terraced")) .when(pl.col("Property type").is_in(FLAT_TYPES)) .then(pl.lit("Flats")) .when(pl.col("Property type") == "Bungalow") .then(pl.lit("Bungalow")) .when(pl.col("Property type").is_in(["Detached", "Semi-Detached"])) .then(pl.col("Property type")) .otherwise(pl.lit(None)) .alias("type_group") ) def sector_expr(): """Polars expression: Postcode -> sector (drop last 2 chars, strip).""" return ( pl.col("Postcode") .str.slice(0, pl.col("Postcode").str.len_chars() - 2) .str.strip_chars() .alias("sector") ) def hierarchy_keys(sector: str) -> tuple[str, str]: """Return (district, area) for a sector string.""" district = sector.rsplit(" ", 1)[0] if " " in sector else sector area = "" for ch in district: if ch.isalpha(): area += ch else: break return district, area NON_REF_TYPES = ["Terraced", "Semi-Detached", "Flats", "Bungalow"] def build_hedonic_features(df: pl.DataFrame) -> np.ndarray: """Build hedonic feature matrix: log(floor_area) + 4 type dummies (ref: Detached).""" fa = df["Total floor area (sqm)"].to_numpy().astype(np.float32) log_fa = np.log(np.maximum(fa, 1.0)).reshape(-1, 1) tg = df["type_group"].to_numpy() parts = [log_fa] for t in NON_REF_TYPES: parts.append((tg == t).astype(np.float32).reshape(-1, 1)) return np.hstack(parts) def interpolate_log_index( index: pl.DataFrame, df: pl.DataFrame, sector_col: str, type_col: str, frac_year_col: str, output_alias: str, ) -> pl.DataFrame: """Join and interpolate log_index at fractional years. For frac_year 2019.75: joins index at year=2019 and year=2020, then linearly interpolates: 0.25*idx_2019 + 0.75*idx_2020. Falls back to floor or ceil when the other is missing. """ floor_col = f"_{output_alias}_floor" ceil_col = f"_{output_alias}_ceil" floor_year = f"_{output_alias}_floor_year" ceil_year = f"_{output_alias}_ceil_year" frac_col = f"_{output_alias}_frac" df = df.with_columns( pl.col(frac_year_col).floor().cast(pl.Int32).alias(floor_year), pl.col(frac_year_col).ceil().cast(pl.Int32).alias(ceil_year), (pl.col(frac_year_col) - pl.col(frac_year_col).floor()).alias(frac_col), ) df = join_type_stratified_index( df, index, sector_col, type_col, floor_year, floor_col ) df = join_type_stratified_index( df, index, sector_col, type_col, ceil_year, ceil_col ) # Interpolate: (1-frac)*floor + frac*ceil, with fallbacks df = df.with_columns( pl.when(pl.col(floor_col).is_not_null() & pl.col(ceil_col).is_not_null()) .then( (1.0 - pl.col(frac_col)) * pl.col(floor_col) + pl.col(frac_col) * pl.col(ceil_col) ) .when(pl.col(floor_col).is_not_null()) .then(pl.col(floor_col)) .when(pl.col(ceil_col).is_not_null()) .then(pl.col(ceil_col)) .otherwise(pl.lit(None)) .alias(output_alias), ).drop(floor_col, ceil_col, floor_year, ceil_year, frac_col) return df def extract_centroids(input_path) -> dict[str, tuple[float, float]]: """Compute mean lat/lon per postcode sector.""" print("Computing sector centroids...") df = ( pl.scan_parquet(input_path) .select("Postcode", "lat", "lon") .filter(pl.col("Postcode").is_not_null(), pl.col("lat").is_not_null()) .with_columns(sector_expr()) .group_by("sector") .agg(pl.col("lat").mean(), pl.col("lon").mean()) .collect() ) centroids = {} for row in df.iter_rows(named=True): centroids[row["sector"]] = (row["lat"], row["lon"]) print(f" {len(centroids):,} sector centroids") return centroids def join_type_stratified_index( df: pl.DataFrame, index: pl.DataFrame, sector_col: str, type_col: str, year_col: str, output_alias: str, ) -> pl.DataFrame: """Join price index with typed->All fallback. Returns df with `output_alias` column.""" idx_typed = index.filter(pl.col("type_group") != "All") idx_all = index.filter(pl.col("type_group") == "All") _typed = f"_{output_alias}_typed" _all = f"_{output_alias}_all" df = df.join( idx_typed.select( "sector", "type_group", "year", pl.col("log_index").alias(_typed) ), left_on=[sector_col, type_col, year_col], right_on=["sector", "type_group", "year"], how="left", ).join( idx_all.select("sector", "year", pl.col("log_index").alias(_all)), left_on=[sector_col, year_col], right_on=["sector", "year"], how="left", ) df = df.with_columns( pl.col(_typed).fill_null(pl.col(_all)).alias(output_alias), ).drop(_typed, _all) return df