216 lines
7.4 KiB
Python
216 lines
7.4 KiB
Python
import re
|
|
import shutil
|
|
import tempfile
|
|
from concurrent.futures import ProcessPoolExecutor
|
|
from os import cpu_count
|
|
from pathlib import Path
|
|
|
|
import polars as pl
|
|
from thefuzz import fuzz
|
|
from tqdm import tqdm
|
|
|
|
from pipeline.local_temp import local_tmp_dir
|
|
|
|
_NUMBER_RE = re.compile(r"\d+")
|
|
_POSTCODE_RE = r"^[A-Z]{1,2}\d[A-Z\d]?\d[A-Z]{2}$"
|
|
MIN_FUZZY_SCORE = 60
|
|
|
|
|
|
def normalize_address_key(s: pl.Expr) -> pl.Expr:
|
|
normalized = (
|
|
s.cast(pl.String)
|
|
.str.to_uppercase()
|
|
.str.replace_all(r"[^0-9A-Z]+", " ")
|
|
.str.replace_all(r"\s+", " ")
|
|
.str.strip_chars()
|
|
)
|
|
return pl.when(normalized.str.contains(r"[A-Z]")).then(normalized).otherwise(None)
|
|
|
|
|
|
def normalize_postcode_key(s: pl.Expr) -> pl.Expr:
|
|
normalized = (
|
|
s.cast(pl.String)
|
|
.str.to_uppercase()
|
|
.str.replace_all(r"[^A-Z0-9]+", "")
|
|
.str.strip_chars()
|
|
)
|
|
return (
|
|
pl.when(normalized.str.contains(_POSTCODE_RE)).then(normalized).otherwise(None)
|
|
)
|
|
|
|
|
|
def fuzzy_join_on_postcode(
|
|
left: pl.LazyFrame,
|
|
right: pl.LazyFrame,
|
|
left_address_col: str,
|
|
right_address_col: str,
|
|
left_postcode_col: str,
|
|
right_postcode_col: str,
|
|
min_score: int = MIN_FUZZY_SCORE,
|
|
) -> pl.LazyFrame:
|
|
"""Fuzzy join two LazyFrames by matching addresses within postcode buckets.
|
|
|
|
Sinks each side to a temporary parquet file so the upstream pipeline
|
|
executes only once. The matching phase collects just three narrow
|
|
columns (index, address, postcode) via projection pushdown, and the
|
|
final join reads the remaining columns lazily.
|
|
|
|
Returns a LazyFrame with all left and right columns. Unmatched rows
|
|
have null right columns.
|
|
"""
|
|
|
|
tmpdir = tempfile.mkdtemp(prefix="fuzzy_join_", dir=local_tmp_dir())
|
|
left_path = Path(tmpdir) / "left.parquet"
|
|
right_path = Path(tmpdir) / "right.parquet"
|
|
|
|
try:
|
|
# Materialise each side exactly once, with a row index, to temp parquet.
|
|
left.with_row_index("_left_idx").sink_parquet(left_path)
|
|
right.with_row_index("_right_idx").sink_parquet(right_path)
|
|
|
|
# Collect only the narrow columns needed for matching (projection pushdown).
|
|
left_match = (
|
|
pl.scan_parquet(left_path)
|
|
.select(
|
|
"_left_idx",
|
|
normalize_address_key(pl.col(left_address_col)).alias("_left_address"),
|
|
normalize_postcode_key(pl.col(left_postcode_col)).alias(
|
|
"_left_postcode"
|
|
),
|
|
)
|
|
.collect(engine="streaming")
|
|
)
|
|
|
|
right_match = (
|
|
pl.scan_parquet(right_path)
|
|
.select(
|
|
"_right_idx",
|
|
normalize_address_key(pl.col(right_address_col)).alias(
|
|
"_right_address"
|
|
),
|
|
normalize_postcode_key(pl.col(right_postcode_col)).alias(
|
|
"_right_postcode"
|
|
),
|
|
)
|
|
.unique(subset=["_right_address", "_right_postcode"], keep="first")
|
|
.collect(engine="streaming")
|
|
)
|
|
|
|
# Group right side by postcode for fast lookup
|
|
right_by_postcode: dict[str, list[tuple[int, str]]] = {}
|
|
for idx, postcode, address in zip(
|
|
right_match["_right_idx"],
|
|
right_match["_right_postcode"],
|
|
right_match["_right_address"],
|
|
):
|
|
if address is not None and postcode is not None:
|
|
right_by_postcode.setdefault(postcode, []).append((idx, address))
|
|
|
|
# Group left side by postcode
|
|
left_by_postcode: dict[str, list[tuple[int, str]]] = {}
|
|
for idx, postcode, address in zip(
|
|
left_match["_left_idx"],
|
|
left_match["_left_postcode"],
|
|
left_match["_left_address"],
|
|
):
|
|
if address is not None and postcode is not None:
|
|
left_by_postcode.setdefault(postcode, []).append((idx, address))
|
|
|
|
del left_match, right_match
|
|
|
|
# Build tasks for each postcode bucket
|
|
tasks = [
|
|
(left_entries, right_by_postcode[postcode], min_score)
|
|
for postcode, left_entries in left_by_postcode.items()
|
|
if postcode in right_by_postcode
|
|
]
|
|
|
|
# Score all pairwise matches in parallel, then greedily assign from
|
|
# highest score downward so best pairs lock in first.
|
|
all_pairs: list[tuple[int, int, int]] = [] # (score, left_idx, right_idx)
|
|
with ProcessPoolExecutor(max_workers=cpu_count()) as executor:
|
|
for pairs in tqdm(
|
|
executor.map(_score_bucket, tasks, chunksize=64),
|
|
total=len(tasks),
|
|
desc="Fuzzy matching",
|
|
):
|
|
all_pairs.extend(pairs)
|
|
|
|
del tasks, left_by_postcode, right_by_postcode
|
|
|
|
# Sort descending by score so best matches are assigned first
|
|
all_pairs.sort(key=lambda t: (t[0], -t[1]), reverse=True)
|
|
|
|
matches: list[tuple[int, int]] = []
|
|
matched_left: set[int] = set()
|
|
matched_right: set[int] = set()
|
|
|
|
for _score, left_idx, right_idx in all_pairs:
|
|
if left_idx in matched_left or right_idx in matched_right:
|
|
continue
|
|
matches.append((left_idx, right_idx))
|
|
matched_left.add(left_idx)
|
|
matched_right.add(right_idx)
|
|
|
|
del all_pairs, matched_left, matched_right
|
|
|
|
# Build a small mapping LazyFrame and join back to the cached parquets.
|
|
if matches:
|
|
mapping = pl.LazyFrame(
|
|
{
|
|
"_left_idx": pl.Series([m[0] for m in matches], dtype=pl.UInt32),
|
|
"_right_idx": pl.Series([m[1] for m in matches], dtype=pl.UInt32),
|
|
}
|
|
)
|
|
else:
|
|
mapping = pl.LazyFrame(
|
|
{
|
|
"_left_idx": pl.Series([], dtype=pl.UInt32),
|
|
"_right_idx": pl.Series([], dtype=pl.UInt32),
|
|
}
|
|
)
|
|
|
|
left_cached = pl.scan_parquet(left_path)
|
|
right_cached = pl.scan_parquet(right_path)
|
|
|
|
result = (
|
|
left_cached.join(mapping, on="_left_idx", how="left")
|
|
.join(right_cached, on="_right_idx", how="left")
|
|
.drop("_left_idx", "_right_idx")
|
|
.collect(engine="streaming")
|
|
)
|
|
finally:
|
|
shutil.rmtree(tmpdir, ignore_errors=True)
|
|
|
|
return result.lazy()
|
|
|
|
|
|
def _numbers_compatible(a: str, b: str) -> bool:
|
|
"""Check that numeric tokens (flat/house numbers) in the shorter set are a subset of the longer.
|
|
|
|
Returns False if one address has numbers and the other doesn't.
|
|
"""
|
|
nums_a = set(_NUMBER_RE.findall(a))
|
|
nums_b = set(_NUMBER_RE.findall(b))
|
|
smaller, larger = (
|
|
(nums_a, nums_b) if len(nums_a) <= len(nums_b) else (nums_b, nums_a)
|
|
)
|
|
if not smaller and larger:
|
|
return False
|
|
return smaller.issubset(larger)
|
|
|
|
|
|
def _score_bucket(
|
|
args: tuple[list[tuple[int, str]], list[tuple[int, str]], int],
|
|
) -> list[tuple[int, int, int]]:
|
|
"""Score all address pairs within a single postcode bucket."""
|
|
left_entries, right_entries, min_score = args
|
|
pairs = []
|
|
for left_row, left_address in left_entries:
|
|
for right_row, right_address in right_entries:
|
|
if not _numbers_compatible(left_address, right_address):
|
|
continue
|
|
score = fuzz.token_sort_ratio(left_address, right_address)
|
|
if score >= min_score:
|
|
pairs.append((score, left_row, right_row))
|
|
return pairs
|