import re import shutil import tempfile from concurrent.futures import ProcessPoolExecutor from os import cpu_count from pathlib import Path import polars as pl from thefuzz import fuzz from tqdm import tqdm from pipeline.local_temp import local_tmp_dir # A house-number token includes any letter suffix: 8A, 8B and plain 8 are # three different properties on the same street, so digit-only extraction # (which collapsed all three to "8") is not enough. Addresses are passed # through normalize_address_key first, so tokens are uppercase and # space-separated and [A-Z] suffices for the suffix. _NUMBER_RE = re.compile(r"\d+[A-Z]?") _POSTCODE_RE = r"^[A-Z]{1,2}\d[A-Z\d]?\d[A-Z]{2}$" # A house number is a strong disambiguator, so a numbered, number-compatible # pair may match on a lower address-similarity score than a number-less one # (named houses / flats by building name), which must match almost exactly to # be trusted. Mirrors merge.py's listings convention. MIN_FUZZY_SCORE = 82 MIN_FUZZY_SCORE_WITHOUT_NUMBERS = 90 def normalize_address_key(s: pl.Expr) -> pl.Expr: normalized = ( s.cast(pl.String) .str.to_uppercase() .str.replace_all(r"[^0-9A-Z]+", " ") .str.replace_all(r"\s+", " ") .str.strip_chars() ) return pl.when(normalized.str.contains(r"[A-Z]")).then(normalized).otherwise(None) def normalize_postcode_key(s: pl.Expr) -> pl.Expr: normalized = ( s.cast(pl.String) .str.to_uppercase() .str.replace_all(r"[^A-Z0-9]+", "") .str.strip_chars() ) return ( pl.when(normalized.str.contains(_POSTCODE_RE)).then(normalized).otherwise(None) ) def fuzzy_join_on_postcode( left: pl.LazyFrame, right: pl.LazyFrame, left_address_col: str, right_address_col: str, left_postcode_col: str, right_postcode_col: str, min_score: int = MIN_FUZZY_SCORE, min_score_without_numbers: int = MIN_FUZZY_SCORE_WITHOUT_NUMBERS, ) -> pl.LazyFrame: """Fuzzy join two LazyFrames by matching addresses within postcode buckets. Sinks each side to a temporary parquet file so the upstream pipeline executes only once. The matching phase collects just three narrow columns (index, address, postcode) via projection pushdown, and the final join reads the remaining columns lazily. Returns a LazyFrame with all left and right columns, plus a ``_match_score`` (UInt8) audit column holding the token_sort_ratio of the accepted match (exact matches score 100). Unmatched rows have null right columns and a null score. """ tmpdir = tempfile.mkdtemp(prefix="fuzzy_join_", dir=local_tmp_dir()) left_path = Path(tmpdir) / "left.parquet" right_path = Path(tmpdir) / "right.parquet" try: # Materialise each side exactly once, with a row index, to temp parquet. left.with_row_index("_left_idx").sink_parquet(left_path) right.with_row_index("_right_idx").sink_parquet(right_path) # Collect only the narrow columns needed for matching (projection pushdown). left_match = ( pl.scan_parquet(left_path) .select( "_left_idx", normalize_address_key(pl.col(left_address_col)).alias("_left_address"), normalize_postcode_key(pl.col(left_postcode_col)).alias( "_left_postcode" ), ) .collect(engine="streaming") ) right_match = ( pl.scan_parquet(right_path) .select( "_right_idx", normalize_address_key(pl.col(right_address_col)).alias( "_right_address" ), normalize_postcode_key(pl.col(right_postcode_col)).alias( "_right_postcode" ), ) .unique(subset=["_right_address", "_right_postcode"], keep="first") .collect(engine="streaming") ) # Group right side by postcode for fast lookup right_by_postcode: dict[str, list[tuple[int, str]]] = {} for idx, postcode, address in zip( right_match["_right_idx"], right_match["_right_postcode"], right_match["_right_address"], ): if address is not None and postcode is not None: right_by_postcode.setdefault(postcode, []).append((idx, address)) # Group left side by postcode left_by_postcode: dict[str, list[tuple[int, str]]] = {} for idx, postcode, address in zip( left_match["_left_idx"], left_match["_left_postcode"], left_match["_left_address"], ): if address is not None and postcode is not None: left_by_postcode.setdefault(postcode, []).append((idx, address)) del left_match, right_match # Build tasks for each postcode bucket tasks = [ ( left_entries, right_by_postcode[postcode], min_score, min_score_without_numbers, ) for postcode, left_entries in left_by_postcode.items() if postcode in right_by_postcode ] # Score all pairwise matches in parallel, then greedily assign from # highest score downward so best pairs lock in first. all_pairs: list[tuple[int, int, int]] = [] # (score, left_idx, right_idx) with ProcessPoolExecutor(max_workers=cpu_count()) as executor: for pairs in tqdm( executor.map(_score_bucket, tasks, chunksize=64), total=len(tasks), desc="Fuzzy matching", ): all_pairs.extend(pairs) del tasks, left_by_postcode, right_by_postcode # Sort descending by score so best matches are assigned first all_pairs.sort(key=lambda t: (t[0], -t[1]), reverse=True) # Keep the score alongside each accepted pair: it is emitted as the # _match_score audit column so downstream consumers can distinguish # exact (100) from looser fuzzy matches. matches: list[tuple[int, int, int]] = [] # (left_idx, right_idx, score) matched_left: set[int] = set() matched_right: set[int] = set() for score, left_idx, right_idx in all_pairs: if left_idx in matched_left or right_idx in matched_right: continue matches.append((left_idx, right_idx, score)) matched_left.add(left_idx) matched_right.add(right_idx) del all_pairs, matched_left, matched_right # Build a small mapping LazyFrame and join back to the cached parquets. if matches: mapping = pl.LazyFrame( { "_left_idx": pl.Series([m[0] for m in matches], dtype=pl.UInt32), "_right_idx": pl.Series([m[1] for m in matches], dtype=pl.UInt32), "_match_score": pl.Series([m[2] for m in matches], dtype=pl.UInt8), } ) else: mapping = pl.LazyFrame( { "_left_idx": pl.Series([], dtype=pl.UInt32), "_right_idx": pl.Series([], dtype=pl.UInt32), "_match_score": pl.Series([], dtype=pl.UInt8), } ) left_cached = pl.scan_parquet(left_path) right_cached = pl.scan_parquet(right_path) result = ( left_cached.join(mapping, on="_left_idx", how="left") .join(right_cached, on="_right_idx", how="left") .drop("_left_idx", "_right_idx") .collect(engine="streaming") ) finally: shutil.rmtree(tmpdir, ignore_errors=True) return result.lazy() def _numbers_compatible(a: str, b: str) -> bool: """Check that the number tokens (house/flat numbers, including any letter suffix) of two addresses are IDENTICAL sets. Equality, not subset: subset logic let "188 GREAT NORTH WAY" absorb "FLAT 1 188 GREAT NORTH WAY" ({188} is a subset of {1, 188}), attaching a single flat's EPC facts to the whole building — tens of thousands of wrong-property matches. Likewise digit-only tokens made "8A" and "8B" both look like {8} and match each other (and plain "8"). Precision over recall: a pair whose two sources genuinely disagree on number tokens is safer left unmatched. One side numbered, the other not -> incompatible. Neither numbered -> compatible; such pairs are scored against the stricter no-numbers threshold instead. """ nums_a = set(_NUMBER_RE.findall(a)) nums_b = set(_NUMBER_RE.findall(b)) if not nums_a and not nums_b: return True return nums_a == nums_b def _score_bucket( args: tuple[list[tuple[int, str]], list[tuple[int, str]], int, int], ) -> list[tuple[int, int, int]]: """Score all address pairs within a single postcode bucket.""" left_entries, right_entries, min_score, min_score_without_numbers = args pairs = [] for left_row, left_address in left_entries: for right_row, right_address in right_entries: if not _numbers_compatible(left_address, right_address): continue score = fuzz.token_sort_ratio(left_address, right_address) # Number-less pairs (named houses, building-name flats) lack the # house-number disambiguator, so require a near-exact match. threshold = ( min_score if _NUMBER_RE.search(left_address) or _NUMBER_RE.search(right_address) else min_score_without_numbers ) if score >= threshold: pairs.append((score, left_row, right_row)) return pairs