import re import shutil import tempfile from collections import Counter from collections.abc import Sequence from concurrent.futures import ProcessPoolExecutor from os import cpu_count from pathlib import Path import polars as pl from thefuzz import fuzz from tqdm import tqdm from pipeline.local_temp import local_tmp_dir from pipeline.utils.normalize import uppercase_alnum_key_expr # A house-number token includes any letter suffix: 8A, 8B and plain 8 are # three different properties on the same street, so digit-only extraction # (which collapsed all three to "8") is not enough. Addresses are passed # through normalize_address_key first, so tokens are uppercase and # space-separated and [A-Z] suffices for the suffix. _NUMBER_RE = re.compile(r"\d+[A-Z]?") # A single-letter flat designator ("FLAT B", "APARTMENT C") is a house-number- # grade disambiguator with no digit in it: without this, FLAT B and FLAT D in # the same building scored ~96 and cross-matched. _FLAT_LETTER_RE = re.compile(r"\b(?:FLAT|APARTMENT|APT|UNIT) ([A-Z])\b") _POSTCODE_RE = r"^[A-Z]{1,2}\d[A-Z\d]?\d[A-Z]{2}$" # A house number is a strong disambiguator, so a numbered, number-compatible # pair may match on a lower address-similarity score than a number-less one # (named houses / flats by building name), which must match almost exactly to # be trusted. Mirrors merge.py's listings convention. MIN_FUZZY_SCORE = 82 MIN_FUZZY_SCORE_WITHOUT_NUMBERS = 90 # A score reached only through an address VARIANT (locality appended / # secondary address lines dropped) accepts a match the primary strings alone # would reject, so it must clear a near-exact bar: in the miss audit >99% of # genuine variant recoveries scored 100, while the rare false variant matches # scored in the 80s. MIN_VARIANT_SCORE = 90 # Tokens that mark a sub-unit of a building. A variant whose added/dropped # tokens include one of these could score a single flat's certificate as if it # were the whole building, so such variants are inadmissible. _FLAT_TOKENS = { "FLAT", "FLATS", "APARTMENT", "APT", "UNIT", "MAISONETTE", "STUDIO", "ROOM", } def normalize_address_key(s: pl.Expr) -> pl.Expr: normalized = uppercase_alnum_key_expr(s) return pl.when(normalized.str.contains(r"[A-Z]")).then(normalized).otherwise(None) def normalize_postcode_key(s: pl.Expr) -> pl.Expr: normalized = ( s.cast(pl.String) .str.to_uppercase() .str.replace_all(r"[^A-Z0-9]+", "") .str.strip_chars() ) return ( pl.when(normalized.str.contains(_POSTCODE_RE)).then(normalized).otherwise(None) ) def fuzzy_join_on_postcode( left: pl.LazyFrame, right: pl.LazyFrame, left_address_col: str, right_address_col: str, left_postcode_col: str, right_postcode_col: str, min_score: int = MIN_FUZZY_SCORE, min_score_without_numbers: int = MIN_FUZZY_SCORE_WITHOUT_NUMBERS, left_variant_cols: Sequence[str] = (), right_variant_cols: Sequence[str] = (), ) -> pl.LazyFrame: """Fuzzy join two LazyFrames by matching addresses within postcode buckets. Sinks each side to a temporary parquet file so the upstream pipeline executes only once. The matching phase collects just three narrow columns (index, address, postcode) via projection pushdown, and the final join reads the remaining columns lazily. ``left_variant_cols`` / ``right_variant_cols`` name alternative address columns for the same property (e.g. the EPC's first address line without its locality suffix, or the price-paid address with its locality appended). A pair is scored as the best token_sort_ratio over all admissible variant combinations: source registers frequently disagree only on a trailing village/locality token, which alone drags short addresses below the match threshold. The number-compatibility gate is always evaluated on the primary addresses, and `_admissible_variants` rejects any variant whose added/dropped tokens carry digits or flat designators, so a variant can never bypass the gate or score a single flat as its whole building. Variant-only scores must clear ``MIN_VARIANT_SCORE``. Returns a LazyFrame with all left and right columns, plus a ``_match_score`` (UInt8) audit column holding the token_sort_ratio of the accepted match (exact matches score 100). Unmatched rows have null right columns and a null score. """ tmpdir = tempfile.mkdtemp(prefix="fuzzy_join_", dir=local_tmp_dir()) left_path = Path(tmpdir) / "left.parquet" right_path = Path(tmpdir) / "right.parquet" try: # Materialise each side exactly once, with a row index, to temp parquet. left.with_row_index("_left_idx").sink_parquet(left_path) right.with_row_index("_right_idx").sink_parquet(right_path) # Collect only the narrow columns needed for matching (projection pushdown). left_match = ( pl.scan_parquet(left_path) .select( "_left_idx", normalize_address_key(pl.col(left_address_col)).alias("_left_address"), normalize_postcode_key(pl.col(left_postcode_col)).alias( "_left_postcode" ), *( normalize_address_key(pl.col(col)).alias(f"_left_variant_{i}") for i, col in enumerate(left_variant_cols) ), ) .collect(engine="streaming") ) right_match = ( pl.scan_parquet(right_path) .select( "_right_idx", normalize_address_key(pl.col(right_address_col)).alias( "_right_address" ), normalize_postcode_key(pl.col(right_postcode_col)).alias( "_right_postcode" ), *( normalize_address_key(pl.col(col)).alias(f"_right_variant_{i}") for i, col in enumerate(right_variant_cols) ), ) .unique(subset=["_right_address", "_right_postcode"], keep="first") .collect(engine="streaming") ) left_variant_names = [f"_left_variant_{i}" for i in range(len(left_variant_cols))] right_variant_names = [ f"_right_variant_{i}" for i in range(len(right_variant_cols)) ] # Group right side by postcode for fast lookup right_by_postcode: dict[str, list[tuple[int, str, tuple[str, ...]]]] = {} for idx, postcode, address, *variants in zip( right_match["_right_idx"], right_match["_right_postcode"], right_match["_right_address"], *(right_match[name] for name in right_variant_names), ): if address is not None and postcode is not None: right_by_postcode.setdefault(postcode, []).append( (idx, address, _admissible_variants(address, variants)) ) # Group left side by postcode left_by_postcode: dict[str, list[tuple[int, str, tuple[str, ...]]]] = {} for idx, postcode, address, *variants in zip( left_match["_left_idx"], left_match["_left_postcode"], left_match["_left_address"], *(left_match[name] for name in left_variant_names), ): if address is not None and postcode is not None: left_by_postcode.setdefault(postcode, []).append( (idx, address, _admissible_variants(address, variants)) ) del left_match, right_match # Build tasks for each postcode bucket tasks = [ ( left_entries, right_by_postcode[postcode], min_score, min_score_without_numbers, ) for postcode, left_entries in left_by_postcode.items() if postcode in right_by_postcode ] # Score all pairwise matches in parallel, then greedily assign from # highest score downward so best pairs lock in first. # Pair tuples are (score, exact, left_idx, right_idx); `exact` marks a # literally-equal primary pair so it wins greedy ties against a pair # that merely token-sorts to the same score (e.g. "APARTMENT 3 1 HIGH # ST" vs "APARTMENT 1 3 HIGH ST" both score 100 against each other's # certificates, but each has a literal twin). all_pairs: list[tuple[int, int, int, int]] = [] with ProcessPoolExecutor(max_workers=cpu_count()) as executor: for pairs in tqdm( executor.map(_score_bucket, tasks, chunksize=64), total=len(tasks), desc="Fuzzy matching", ): all_pairs.extend(pairs) del tasks, left_by_postcode, right_by_postcode # Sort so the best matches are assigned first: score, then literal # equality, then stable left-index order. all_pairs.sort(key=lambda t: (t[0], t[1], -t[2]), reverse=True) # Keep the score alongside each accepted pair: it is emitted as the # _match_score audit column so downstream consumers can distinguish # exact (100) from looser fuzzy matches. matches: list[tuple[int, int, int]] = [] # (left_idx, right_idx, score) matched_left: set[int] = set() matched_right: set[int] = set() for score, _exact, left_idx, right_idx in all_pairs: if left_idx in matched_left or right_idx in matched_right: continue matches.append((left_idx, right_idx, score)) matched_left.add(left_idx) matched_right.add(right_idx) del all_pairs, matched_left, matched_right # Build a small mapping LazyFrame and join back to the cached parquets. if matches: mapping = pl.LazyFrame( { "_left_idx": pl.Series([m[0] for m in matches], dtype=pl.UInt32), "_right_idx": pl.Series([m[1] for m in matches], dtype=pl.UInt32), "_match_score": pl.Series([m[2] for m in matches], dtype=pl.UInt8), } ) else: mapping = pl.LazyFrame( { "_left_idx": pl.Series([], dtype=pl.UInt32), "_right_idx": pl.Series([], dtype=pl.UInt32), "_match_score": pl.Series([], dtype=pl.UInt8), } ) left_cached = pl.scan_parquet(left_path) right_cached = pl.scan_parquet(right_path) result = ( left_cached.join(mapping, on="_left_idx", how="left") .join(right_cached, on="_right_idx", how="left") .drop("_left_idx", "_right_idx") .collect(engine="streaming") ) finally: shutil.rmtree(tmpdir, ignore_errors=True) return result.lazy() def _number_tokens(address: str) -> set[str]: tokens = set(_NUMBER_RE.findall(address)) tokens.update(_FLAT_LETTER_RE.findall(address)) return tokens def _numbers_compatible(a: str, b: str) -> bool: """Check that the number tokens (house/flat numbers, including any letter suffix, plus single-letter flat designators) of two addresses are IDENTICAL sets. Equality, not subset: subset logic let "188 GREAT NORTH WAY" absorb "FLAT 1 188 GREAT NORTH WAY" ({188} is a subset of {1, 188}), attaching a single flat's EPC facts to the whole building — tens of thousands of wrong-property matches. Likewise digit-only tokens made "8A" and "8B" both look like {8} and match each other (and plain "8"), and ungated letter flats let "FLAT D 39 X ST" cross-match "FLAT F 39 X ST" at ~96. Precision over recall: a pair whose two sources genuinely disagree on number tokens is safer left unmatched. One side numbered, the other not -> incompatible. Neither numbered -> compatible; such pairs are scored against the stricter no-numbers threshold instead. """ nums_a = _number_tokens(a) nums_b = _number_tokens(b) if not nums_a and not nums_b: return True return nums_a == nums_b def _admissible_variants( primary: str, variants: Sequence[str | None] ) -> tuple[str, ...]: """Variants of ``primary`` that are safe to score against the other side. A variant may only ADD or DROP whole tokens relative to the primary (one word multiset must contain the other) — never substitute, so a register row whose address lines disagree with the combined address can't smuggle in a different street. The number gate runs on the primary addresses only, so the added/dropped tokens must additionally carry no digits (house numbers) and no flat designator (a "Flat 1"-style secondary line dropped from an EPC address would otherwise let a single flat score as the whole building). The remaining admissible difference is exactly the harmless kind variants exist for: trailing locality/village/town words. """ primary_words = Counter(primary.split()) admissible: list[str] = [] for variant in variants: if not variant or variant == primary: continue variant_words = Counter(variant.split()) if not (variant_words <= primary_words or primary_words <= variant_words): continue changed = (primary_words - variant_words) + (variant_words - primary_words) if any( any(ch.isdigit() for ch in token) or token in _FLAT_TOKENS for token in changed ): continue admissible.append(variant) return tuple(dict.fromkeys(admissible)) def _score_bucket( args: tuple[ list[tuple[int, str, tuple[str, ...]]], list[tuple[int, str, tuple[str, ...]]], int, int, ], ) -> list[tuple[int, int, int, int]]: """Score all address pairs within a single postcode bucket.""" left_entries, right_entries, min_score, min_score_without_numbers = args pairs = [] for left_row, left_address, left_variants in left_entries: for right_row, right_address, right_variants in right_entries: if not _numbers_compatible(left_address, right_address): continue score = fuzz.token_sort_ratio(left_address, right_address) # Variant pairs recover same-property matches where one register # carries a locality suffix the other lacks; a variant-only score # must clear the near-exact MIN_VARIANT_SCORE bar. if score < 100 and (left_variants or right_variants): for left_variant in (left_address, *left_variants): for right_variant in (right_address, *right_variants): if ( left_variant is left_address and right_variant is right_address ): continue variant_score = fuzz.token_sort_ratio( left_variant, right_variant ) if variant_score >= MIN_VARIANT_SCORE and variant_score > score: score = variant_score # Number-less pairs (named houses, building-name flats) lack the # house-number disambiguator, so require a near-exact match. threshold = ( min_score if _NUMBER_RE.search(left_address) or _NUMBER_RE.search(right_address) else min_score_without_numbers ) if score >= threshold: pairs.append( (score, int(left_address == right_address), left_row, right_row) ) return pairs