From cf39ad754ec290bc045f128f2fdd3ff0eaf8c063 Mon Sep 17 00:00:00 2001 From: Andras Schmelczer Date: Thu, 11 Jun 2026 20:15:31 +0100 Subject: [PATCH] Fix enrich listing --- pipeline/transform/merge.py | 249 +++++++++++++++++- .../price_estimation/test_estimate.py | 104 ++++++++ pipeline/transform/test_merge.py | 182 ++++++++++++- 3 files changed, 529 insertions(+), 6 deletions(-) create mode 100644 pipeline/transform/price_estimation/test_estimate.py diff --git a/pipeline/transform/merge.py b/pipeline/transform/merge.py index 8cad45e..0130408 100644 --- a/pipeline/transform/merge.py +++ b/pipeline/transform/merge.py @@ -1098,6 +1098,17 @@ def _postcode_outcode_expr(column: str) -> pl.Expr: ) +_OUTCODE_RE = re.compile(r"^([A-Z]{1,2}\d[A-Z\d]?)\d[A-Z]{2}$") + + +def _outcode_of(postcode: str | None) -> str | None: + """Outcode of a compact normalised postcode ("BR15RW" -> "BR1").""" + if not postcode: + return None + match = _OUTCODE_RE.match(postcode) + return match.group(1) if match else None + + def _canonical_epc_property_type_expr() -> pl.Expr: bad_built_form = pl.col("built_form").is_null() | pl.col("built_form").is_in( ["NO DATA!", "Not Recorded"] @@ -1159,6 +1170,66 @@ def _has_number(address: str | None) -> bool: return bool(address and _NUMBER_RE.search(address)) +def _enum_bonus( + left: str | None, right: str | None, *, exact: float, mismatch: float +) -> float: + if not left or not right: + return 0.0 + return exact if left == right else mismatch + + +def _ratio_bonus( + left: float | int | None, right: float | int | None, pct: float, cap: float +) -> float: + if left is None or right is None: + return 0.0 + try: + left_f = float(left) + right_f = float(right) + except (TypeError, ValueError): + return 0.0 + if left_f <= 0 or right_f <= 0: + return 0.0 + rel = abs(left_f - right_f) / max(left_f, right_f) + if rel > pct: + return 0.0 + return cap * (1.0 - rel / pct) + + +def _rooms_bonus(left: int | None, right: int | None) -> float: + if left is None or right is None: + return 0.0 + try: + diff = abs(int(left) - int(right)) + except (TypeError, ValueError): + return 0.0 + if diff == 0: + return 4.0 + if diff == 1: + return 2.0 + return 0.0 + + +def _street_only_address(address: str) -> str: + """The street/locality part of a normalised address: digit-bearing tokens + (house numbers, flat numbers, including letter suffixes like 8A) removed.""" + return " ".join(token for token in address.split() if not _NUMBER_RE.search(token)) + + +def _is_specific_street_query(query: str) -> bool: + """Whether a number-less listing address is specific enough for the + street-level fallback. token_set_ratio scores 100 whenever the query's + tokens are a subset of the candidate's, so a one-token query (a bare named + house like "KINGSWOOD") would match any street containing that word; + require at least two substantive tokens ("OLDSTEAD ROAD ...") instead.""" + substantive = [ + token + for token in query.split() + if token not in _LISTED_NAME_STOP_WORDS and len(token) >= 3 + ] + return len(substantive) >= 2 + + def _normalize_uprn(value: object) -> str | None: """Canonical UPRN string (digits only) or None. @@ -1197,6 +1268,8 @@ def _best_listing_match( `fuzzy_join._score_bucket`. A house number additionally lowers the score threshold and (via `_address_score`) permits token_set scoring; a number-less address scores on token_sort only and must match the street almost exactly. + The direct-EPC path layers a street-level fallback on top of this strict + matcher — see `_best_street_epc_fallback`. ``addressed_fields`` names the candidate columns to fuzzy-match against (a candidate may carry both a register and an EPC address). Returns @@ -1243,6 +1316,120 @@ def _best_listing_match( return best, float(best_score), "address", best_field +# Ranking bonuses for the street-level direct-EPC fallback. A certificate in +# the listing's own postcode unit is the nearest segment of the street, and a +# certificate sharing a house-number token with the listing (e.g. listing +# "751 753 Cranbrook Road" vs certificate "751 Cranbrook Road", which fails the +# strict set-equality gate) is almost certainly the right property — both +# should beat a bare attribute-agreement win. +_STREET_FALLBACK_SAME_POSTCODE_BONUS = 3.0 +_STREET_FALLBACK_NUMBER_OVERLAP_BONUS = 8.0 + + +def _best_street_epc_fallback( + listing: dict, + outcode_streets: dict[str, list[dict]] | None, + outcode_noise_tokens: set[str], + street_score_cache: dict[tuple[str, str], list[tuple[int, str]]], +) -> tuple[dict, float, str, None] | None: + """Street-level direct-EPC fallback for listings the strict matcher missed. + + ~90% of scraped listings publish a street-level address only ("Oldstead + Road, Bromley" — Rightmove never exposes the house number or UPRN), so the + strict matcher in `_best_listing_match` can never match them against the + virtually-always-numbered EPC register and their EPC-derived fields + (energy rating, interior height, former-council-house flag, construction + year) would all be null. Such a listing is instead matched to the best EPC + certificate on the SAME STREET in its own OUTCODE: long streets span + several postcode units, so postcode-only buckets missed ~43% of otherwise + matchable listings (funnel-measured on 2026-06 data). Street identity is + token_set_ratio between the digit-stripped halves of both addresses (every + same-street certificate scores ~100); qualifying certificates are ranked + by attribute agreement (property type, floor area, habitable rooms) plus + a same-postcode-unit preference and a house-number-overlap bonus (a + numbered listing that failed the strict set-equality gate, e.g. a + "751 753" range vs "751", still lands on the right property). The result + is street-representative rather than property-exact — hence the distinct + "street" method label so downstream consumers can tell the two confidence + levels apart. Applied to the direct-EPC join only; the property-register + (sale history) join stays strict because a price is property-exact in a + way an energy band is not. + + ``street_score_cache`` memoises the per-(outcode, query-street) fuzzy scan + over the outcode's unique street keys: listings on the same street share + the scan, which keeps the full-register run to seconds. + """ + query = listing.get("_listing_match_address") + if not query or not outcode_streets: + return None + query_street = _street_only_address(query) + if not query_street or not _is_specific_street_query(query_street): + return None + + outcode = ( + listing.get("_listing_outcode") + or _outcode_of(listing.get("_listing_match_postcode")) + or "" + ) + cache_key = (outcode, query_street) + qualifying = street_score_cache.get(cache_key) + if qualifying is None: + # A qualifying street must be anchored by a shared token that is NOT a + # locality suffix of this outcode (see _index_epc_streets), so a + # town-only address can't subset-inflate onto an arbitrary street. + query_tokens = set(query_street.split()) + qualifying = [ + (score, street) + for street in outcode_streets + if (query_tokens & set(street.split())) - outcode_noise_tokens + and (score := fuzz.token_set_ratio(query_street, street)) + >= _LISTING_MATCH_MIN_SCORE_WITHOUT_NUMBERS + ] + street_score_cache[cache_key] = qualifying + + listing_postcode = listing.get("_listing_match_postcode") + listing_numbers = set(_NUMBER_RE.findall(query)) + best: dict | None = None + best_total = float("-inf") + best_street_score = 0 + for street_score, street in qualifying: + for candidate in outcode_streets[street]: + total = float(street_score) + total += _enum_bonus( + listing.get("_actual_property_type"), + candidate.get("_direct_epc_canonical_property_type"), + exact=6.0, + mismatch=-6.0, + ) + total += _ratio_bonus( + listing.get("_actual_total_floor_area"), + candidate.get("_direct_total_floor_area"), + pct=0.12, + cap=8.0, + ) + total += _rooms_bonus( + listing.get("_actual_number_habitable_rooms"), + candidate.get("_direct_number_habitable_rooms"), + ) + if ( + listing_postcode + and candidate.get("_direct_epc_match_postcode") == listing_postcode + ): + total += _STREET_FALLBACK_SAME_POSTCODE_BONUS + if listing_numbers and listing_numbers & set( + _NUMBER_RE.findall(candidate.get("_direct_epc_match_address") or "") + ): + total += _STREET_FALLBACK_NUMBER_OVERLAP_BONUS + if total > best_total: + best_total = total + best = candidate + best_street_score = street_score + + if best is None: + return None + return best, float(best_street_score), "street", None + + def _load_listings_for_merge(listings_path: Path, arcgis_path: Path) -> pl.DataFrame: """Read the listings parquet and prepare it for the wide-frame merge. @@ -1616,8 +1803,52 @@ def _match_listing_properties( ) +def _index_epc_streets( + epc_candidates: pl.DataFrame, +) -> tuple[dict[str, dict[str, list[dict]]], dict[str, set[str]]]: + """Index EPC candidate rows for the street-level fallback. + + Returns ``(streets, noise_tokens)``: ``streets`` maps outcode -> street key + -> rows (street key = the digit-stripped match address); ``noise_tokens`` + maps outcode -> the tokens appearing in at least a quarter of that + outcode's street keys. Those are locality suffixes (LONDON, SURREY, the + town name) rather than street names, and a fallback match must be anchored + by at least one token that is NOT one of them — otherwise a town-only + listing address ("COULSDON SURREY") token_set-inflates to 100 against any + street key carrying the same locality suffix and matches an arbitrary + street in the outcode. + """ + streets: dict[str, dict[str, list[dict]]] = {} + for row in epc_candidates.iter_rows(named=True): + outcode = row.get("_direct_epc_outcode") + address = row.get("_direct_epc_match_address") + if not outcode or not address: + continue + street = _street_only_address(address) + if not street: + continue + streets.setdefault(outcode, {}).setdefault(street, []).append(row) + + noise_tokens: dict[str, set[str]] = {} + for outcode, by_street in streets.items(): + cutoff = max(2, len(by_street) // 4) + counts: dict[str, int] = {} + for street in by_street: + for token in set(street.split()): + counts[token] = counts.get(token, 0) + 1 + noise_tokens[outcode] = { + token for token, count in counts.items() if count >= cutoff + } + return streets, noise_tokens + + def _best_direct_epc_candidate( - listing: dict, uprn_index: dict[str, dict], candidates: list[dict] + listing: dict, + uprn_index: dict[str, dict], + candidates: list[dict], + outcode_streets: dict[str, list[dict]] | None, + outcode_noise_tokens: set[str], + street_score_cache: dict[tuple[str, str], list[tuple[int, str]]], ) -> dict | None: result = _best_listing_match( listing.get("_listing_uprn"), @@ -1626,6 +1857,10 @@ def _best_direct_epc_candidate( candidates, ["_direct_epc_match_address"], ) + if result is None: + result = _best_street_epc_fallback( + listing, outcode_streets, outcode_noise_tokens, street_score_cache + ) if result is None: return None candidate, score, method, _field = result @@ -1661,11 +1896,21 @@ def _match_direct_epc( buckets, uprn_index = _index_candidates( epc_candidates, "_direct_epc_match_postcode", "_direct_epc_uprn" ) + street_index, noise_tokens = _index_epc_streets(epc_candidates) + street_score_cache: dict[tuple[str, str], list[tuple[int, str]]] = {} matches = [] for listing in listing_matches.iter_rows(named=True): postcode = listing.get("_listing_match_postcode") bucket = buckets.get(postcode, []) if postcode else [] - match = _best_direct_epc_candidate(listing, uprn_index, bucket) + outcode = listing.get("_listing_outcode") or _outcode_of(postcode) + match = _best_direct_epc_candidate( + listing, + uprn_index, + bucket, + street_index.get(outcode) if outcode else None, + noise_tokens.get(outcode, set()) if outcode else set(), + street_score_cache, + ) if match is not None: matches.append(match) diff --git a/pipeline/transform/price_estimation/test_estimate.py b/pipeline/transform/price_estimation/test_estimate.py new file mode 100644 index 0000000..0e9245f --- /dev/null +++ b/pipeline/transform/price_estimation/test_estimate.py @@ -0,0 +1,104 @@ +"""Tests for the floor-area-less estimate guard in estimate.py. + +The per-sqm plausibility guard cannot fire when floor area is null/zero, which +let commercial blocks misfiled as dwellings keep absurd headline estimates +(e.g. a GBP 175M "Detached" in SW1W). apply_floorless_estimate_guard nulls a +floorless estimate only when it exceeds max(FLOORLESS_ESTIMATE_P99_MULT x the +district's recent p99 sale price, FLOORLESS_ESTIMATE_MIN_CAP), and leaves +rows it cannot judge (no recent district sales) alone. +""" + +from datetime import date + +import polars as pl + +from pipeline.transform.price_estimation.estimate import ( + FLOORLESS_P99_LOOKBACK_YEARS, + apply_floorless_estimate_guard, +) +from pipeline.transform.price_estimation.utils import CURRENT_YEAR + +RECENT = date(CURRENT_YEAR - 1, 6, 1) # inside the p99 look-back window +STALE = date(CURRENT_YEAR - FLOORLESS_P99_LOOKBACK_YEARS - 5, 6, 1) # outside + + +def _guard_input(rows): + """Frame with the columns the guard reads, in (id, sector, estimate, + floor_area, last_price, last_date) row order. Pool rows (null estimate) + only feed the per-district p99 reference.""" + return pl.DataFrame( + rows, + schema={ + "id": pl.Int64, + "_sector": pl.String, + "Estimated current price": pl.Float64, + "Total floor area (sqm)": pl.Float64, + "Last known price": pl.Float64, + "Date of last transaction": pl.Date, + }, + orient="row", + ) + + +def _estimate_for(result: pl.DataFrame, row_id: int): + return result.filter(pl.col("id") == row_id)["Estimated current price"][0] + + +def test_floorless_guard_nulls_and_keeps_the_right_rows(): + rows = [ + # SW1W pool: 5 recent sales at 3M -> district p99 = 3M, cap = 6M. + *[(100 + i, "SW1W 9", None, None, 3_000_000.0, RECENT) for i in range(5)], + # 175M floorless estimate, 29x the 6M cap -> nulled. + (1, "SW1W 9", 175_000_000.0, None, None, None), + # Zero floor area counts as floorless (psm guard can't fire) -> nulled. + (2, "SW1W 8", 175_000_000.0, 0.0, None, None), + # 5M floorless is under the 2 x p99 cap -> kept. + (3, "SW1W 9", 5_000_000.0, None, None, None), + # Floor area PRESENT: never touched by this guard, however absurd + # (the per-sqm guard owns that case). + (4, "SW1W 9", 175_000_000.0, 93.0, None, None), + # ZZ1 pool: cheap district, p99 = 500k -> cap = max(1M, 2M) = 2M. + *[(200 + i, "ZZ1 4", None, None, 500_000.0, RECENT) for i in range(5)], + # Genuine mansion in a cheap district: above 2 x p99 but below the + # absolute 2M floor -> kept. + (5, "ZZ1 4", 1_500_000.0, None, None, None), + # Above both the absolute floor and 2 x p99 -> nulled. + (6, "ZZ1 4", 2_500_000.0, None, None, None), + # XX9's only sale is outside the look-back window -> null p99 -> + # cannot judge -> kept, even at 50M. + (300, "XX9 1", None, None, 4_000_000.0, STALE), + (7, "XX9 1", 50_000_000.0, None, None, None), + # No sector at all -> no district reference -> kept. + (8, None, 50_000_000.0, None, None, None), + ] + + result = apply_floorless_estimate_guard(_guard_input(rows)) + + assert _estimate_for(result, 1) is None + assert _estimate_for(result, 2) is None + assert _estimate_for(result, 3) == 5_000_000.0 + assert _estimate_for(result, 4) == 175_000_000.0 + assert _estimate_for(result, 5) == 1_500_000.0 + assert _estimate_for(result, 6) is None + assert _estimate_for(result, 7) == 50_000_000.0 + assert _estimate_for(result, 8) == 50_000_000.0 + + +def test_floorless_guard_preserves_schema_and_rows(): + """The guard adds no columns, drops no rows, and leaves non-estimate + columns untouched (it runs in-pipeline before temp-column dropping).""" + df = _guard_input( + [ + (1, "SW1W 9", None, None, 3_000_000.0, RECENT), + (2, "SW1W 9", 175_000_000.0, None, None, None), + ] + ) + + result = apply_floorless_estimate_guard(df) + + assert result.columns == df.columns + assert len(result) == len(df) + assert result["id"].to_list() == df["id"].to_list() + assert result.drop("Estimated current price").equals( + df.drop("Estimated current price") + ) diff --git a/pipeline/transform/test_merge.py b/pipeline/transform/test_merge.py index d2dfd3d..488b858 100644 --- a/pipeline/transform/test_merge.py +++ b/pipeline/transform/test_merge.py @@ -954,6 +954,173 @@ def test_match_direct_epc_matches_by_address_in_same_postcode() -> None: assert matches["_direct_epc_match_method"].to_list() == ["address"] +def test_match_direct_epc_street_fallback_matches_numberless_listing() -> None: + # A street-level listing address (the Rightmove norm: no house number, no + # UPRN) cannot pass the strict number gate, but must still pick up + # street-representative EPC facts from a same-street certificate in its own + # postcode, labelled with the lower-confidence "street" method. + matches = _match_direct_epc( + _listing_matches([{"_listing_match_address": "EXAMPLE ROAD BROMLEY"}]), + _direct_epc_candidates([{"_direct_epc_match_address": "7 EXAMPLE ROAD"}]), + ) + + assert matches.height == 1 + assert matches["_direct_epc_match_method"].to_list() == ["street"] + + +def test_match_direct_epc_street_fallback_prefers_attribute_agreement() -> None: + # Every same-street certificate ties on street similarity, so the listing's + # attributes (floor area here) must pick the most plausible one. + listings = pl.DataFrame( + [ + { + "_listing_idx": 0, + "_listing_match_address": "EXAMPLE ROAD BROMLEY", + "_listing_match_postcode": "AA11AA", + "_listing_uprn": None, + "_actual_total_floor_area": 78.0, + } + ], + schema={**_LISTING_MATCH_SCHEMA, "_actual_total_floor_area": pl.Float64}, + ) + matches = _match_direct_epc( + listings, + _direct_epc_candidates( + [ + { + "_direct_epc_match_address": "7 EXAMPLE ROAD", + "_direct_epc_address": "7, Example Road", + "_direct_total_floor_area": 150.0, + }, + { + "_direct_epc_row": 1, + "_direct_epc_match_address": "9 EXAMPLE ROAD", + "_direct_epc_address": "9, Example Road", + "_direct_total_floor_area": 80.0, + }, + ] + ), + ) + + assert matches.height == 1 + assert matches["_direct_epc_address"].to_list() == ["9, Example Road"] + assert matches["_direct_epc_match_method"].to_list() == ["street"] + + +def test_match_direct_epc_street_fallback_spans_postcodes_within_outcode() -> None: + # Long streets cross postcode units. A street-only listing whose own + # postcode has no certificate must still pick up a same-street certificate + # from a sibling postcode in the same outcode. + matches = _match_direct_epc( + _listing_matches( + [ + { + "_listing_match_address": "EXAMPLE ROAD BROMLEY", + "_listing_match_postcode": "AA12ZZ", + } + ] + ), + _direct_epc_candidates( + [ + { + "_direct_epc_match_address": "7 EXAMPLE ROAD", + "_direct_epc_match_postcode": "AA11AA", + } + ] + ), + ) + + assert matches.height == 1 + assert matches["_direct_epc_match_method"].to_list() == ["street"] + + +def test_match_direct_epc_street_fallback_prefers_own_postcode_segment() -> None: + # Within one street, the certificate in the listing's own postcode unit is + # the nearest segment and must win over an equal candidate further along. + matches = _match_direct_epc( + _listing_matches([{"_listing_match_address": "EXAMPLE ROAD BROMLEY"}]), + _direct_epc_candidates( + [ + { + "_direct_epc_match_address": "7 EXAMPLE ROAD", + "_direct_epc_address": "7, Example Road", + "_direct_epc_match_postcode": "AA12ZZ", + }, + { + "_direct_epc_row": 1, + "_direct_epc_match_address": "9 EXAMPLE ROAD", + "_direct_epc_address": "9, Example Road", + "_direct_epc_match_postcode": "AA11AA", + }, + ] + ), + ) + + assert matches.height == 1 + assert matches["_direct_epc_address"].to_list() == ["9, Example Road"] + + +def test_match_direct_epc_street_fallback_recovers_numbered_listing() -> None: + # A numbered listing whose house number has no certificate (number sets + # disjoint, so the strict gate skips every candidate) still picks up a + # street-representative certificate via the fallback. + matches = _match_direct_epc( + _listing_matches([{"_listing_match_address": "17 EXAMPLE ROAD BROMLEY"}]), + _direct_epc_candidates([{"_direct_epc_match_address": "9 EXAMPLE ROAD"}]), + ) + + assert matches.height == 1 + assert matches["_direct_epc_match_method"].to_list() == ["street"] + + +def test_match_direct_epc_street_fallback_rejects_town_only_address() -> None: + # A town-only listing address ("COULSDON SURREY") shares only the locality + # suffix that most street keys in the outcode carry; without a street-name + # anchor it must not subset-inflate onto an arbitrary street. + matches = _match_direct_epc( + _listing_matches([{"_listing_match_address": "COULSDON SURREY"}]), + _direct_epc_candidates( + [ + { + "_direct_epc_row": i, + "_direct_epc_match_address": f"{number} {street} SURREY COULSDON", + } + for i, (number, street) in enumerate( + [ + ("49", "LACKFORD ROAD"), + ("12", "CHIPSTEAD VALLEY ROAD"), + ("3", "WINDERMERE ROAD"), + ] + ) + ] + ), + ) + + assert matches.height == 0 + + +def test_match_direct_epc_street_fallback_rejects_single_token_query() -> None: + # token_set_ratio scores 100 whenever the query's tokens subset the + # candidate's, so a bare one-token name must not street-match anything. + matches = _match_direct_epc( + _listing_matches([{"_listing_match_address": "KINGSWOOD"}]), + _direct_epc_candidates([{"_direct_epc_match_address": "4 KINGSWOOD ROAD"}]), + ) + + assert matches.height == 0 + + +def test_match_direct_epc_street_fallback_rejects_different_street() -> None: + # The fallback is street-identity within the postcode, not "anything in the + # postcode": a certificate on another street must not match. + matches = _match_direct_epc( + _listing_matches([{"_listing_match_address": "OLDSTEAD ROAD BROMLEY"}]), + _direct_epc_candidates([{"_direct_epc_match_address": "5 CAMBRIDGE ROAD"}]), + ) + + assert matches.height == 0 + + def test_normalize_uprn_handles_types_and_floats() -> None: assert _normalize_uprn(None) is None assert _normalize_uprn("") is None @@ -1167,13 +1334,20 @@ def test_match_listing_properties_uprn_wins_dedup_tie() -> None: assert matches["_property_match_method"].to_list() == ["uprn"] -def test_match_direct_epc_does_not_match_other_postcode_without_uprn() -> None: - # Matching is by postcode/UPRN/street — never by coordinate proximity — so a - # same-street EPC in a different postcode with no shared UPRN is skipped. +def test_match_direct_epc_does_not_match_other_outcode_without_uprn() -> None: + # Matching is by postcode/UPRN/street — never by coordinate proximity — and + # the street fallback is outcode-scoped, so a same-street EPC in a different + # OUTCODE with no shared UPRN is skipped. matches = _match_direct_epc( _listing_matches([{"_listing_match_postcode": "AA11AA"}]), _direct_epc_candidates( - [{"_direct_epc_match_postcode": "BB22BB", "_direct_epc_uprn": None}] + [ + { + "_direct_epc_match_postcode": "BB22BB", + "_direct_epc_outcode": "BB2", + "_direct_epc_uprn": None, + } + ] ), )